Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.google.cloud.pubsublite;

import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
Expand All @@ -32,8 +33,6 @@
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Optional;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -110,19 +109,14 @@ public Publisher<Offset> build() throws ApiException {
serviceClient = autoBuilt.serviceClient().get();
} else {
try {
Map<String, String> metadata = autoBuilt.context().getMetadata();
Map<String, String> routingMetadata =
RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition());
Map<String, String> allMetadata =
ImmutableMap.<String, String>builder()
.putAll(metadata)
.putAll(routingMetadata)
.build();
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
addDefaultMetadata(
autoBuilt.context(),
RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition()),
settingsBuilder);
serviceClient =
PublisherServiceClient.create(
addDefaultSettings(
autoBuilt.topic().location().region(),
PublisherServiceSettings.newBuilder().setHeaderProvider(() -> allMetadata)));
addDefaultSettings(autoBuilt.topic().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,43 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;

final class RoutingMetadata {
private RoutingMetadata() {}
public final class RoutingMetadata {

static final String PARAMS_HEADER = "x-goog-request-params";
private static final String PARAMS_HEADER = "x-goog-request-params";
private final Map<String, String> metadata;

static Map<String, String> of(TopicPath topic, Partition partition) throws ApiException {
public static RoutingMetadata of(TopicPath topic, Partition partition) throws ApiException {
return new RoutingMetadata(topic, partition);
}

public static RoutingMetadata of(SubscriptionPath subscription, Partition partition)
throws ApiException {
return new RoutingMetadata(subscription, partition);
}

private RoutingMetadata(TopicPath topic, Partition partition) {
try {
String topic_value = URLEncoder.encode(topic.toString(), StandardCharsets.UTF_8.toString());
String params = String.format("partition=%s&topic=%s", partition.value(), topic_value);
return ImmutableMap.of(PARAMS_HEADER, params);
this.metadata = ImmutableMap.of(PARAMS_HEADER, params);
} catch (UnsupportedEncodingException e) {
throw toCanonical(e).underlying;
}
}

static Map<String, String> of(SubscriptionPath subscription, Partition partition)
throws ApiException {
private RoutingMetadata(SubscriptionPath subscription, Partition partition) {
try {
String subscription_value =
URLEncoder.encode(subscription.toString(), StandardCharsets.UTF_8.toString());
String params =
String.format("partition=%s&subscription=%s", partition.value(), subscription_value);
return ImmutableMap.of(PARAMS_HEADER, params);
this.metadata = ImmutableMap.of(PARAMS_HEADER, params);
} catch (UnsupportedEncodingException e) {
throw toCanonical(e).underlying;
}
}

public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

Expand All @@ -25,6 +25,8 @@
import com.google.api.gax.rpc.ClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.internal.Lazy;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -60,4 +62,18 @@ Settings addDefaultSettings(CloudRegion target, Builder builder) throws ApiExcep
throw toCanonical(t).underlying;
}
}

// Adds context routing metadata for publisher or subscriber.
public static <
Settings extends ClientSettings<Settings>,
Builder extends ClientSettings.Builder<Settings, Builder>>
Builder addDefaultMetadata(
PubsubContext context, RoutingMetadata routingMetadata, Builder builder) {
return builder.setHeaderProvider(
() ->
ImmutableMap.<String, String>builder()
.putAll(context.getMetadata())
.putAll(routingMetadata.getMetadata())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand All @@ -28,8 +29,6 @@
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

Expand Down Expand Up @@ -77,19 +76,16 @@ public Subscriber build() throws ApiException {
serviceClient = autoBuilt.serviceClient().get();
} else {
try {
Map<String, String> metadata = autoBuilt.context().getMetadata();
Map<String, String> routingMetadata =
RoutingMetadata.of(autoBuilt.subscriptionPath(), autoBuilt.partition());
Map<String, String> allMetadata =
ImmutableMap.<String, String>builder()
.putAll(metadata)
.putAll(routingMetadata)
.build();
SubscriberServiceSettings.Builder settingsBuilder =
SubscriberServiceSettings.newBuilder();
addDefaultMetadata(
autoBuilt.context(),
RoutingMetadata.of(autoBuilt.subscriptionPath(), autoBuilt.partition()),
settingsBuilder);
serviceClient =
SubscriberServiceClient.create(
addDefaultSettings(
autoBuilt.subscriptionPath().location().region(),
SubscriberServiceSettings.newBuilder().setHeaderProvider(() -> allMetadata)));
autoBuilt.subscriptionPath().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.function.Consumer;

public interface PartitionSubscriberFactory extends Serializable {
Subscriber newSubscriber(
Partition partition, Consumer<ImmutableList<SequencedMessage>> message_consumer)
throws ApiException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import java.io.Serializable;
Expand All @@ -33,14 +32,17 @@
public class PslContinuousInputPartition
implements ContinuousInputPartition<InternalRow>, Serializable {

private final SubscriberFactory subscriberFactory;
Comment thread
jiangmichaellll marked this conversation as resolved.
private final SparkPartitionOffset startOffset;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;

public PslContinuousInputPartition(
SubscriberFactory subscriberFactory,
SparkPartitionOffset startOffset,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings) {
this.subscriberFactory = subscriberFactory;
this.startOffset = startOffset;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
Expand All @@ -59,14 +61,7 @@ public InputPartitionReader<InternalRow> createContinuousReader(PartitionOffset
try {
subscriber =
new BlockingPullSubscriberImpl(
// TODO(jiangmichael): Pass credentials settings here.
(consumer) ->
SubscriberBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(pslPartitionOffset.partition())
.setContext(PubsubContext.of(Constants.FRAMEWORK))
.setMessageConsumer(consumer)
.build(),
subscriberFactory,
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class PslContinuousReader implements ContinuousReader {

private final CursorClient cursorClient;
private final MultiPartitionCommitter committer;
private final PartitionSubscriberFactory partitionSubscriberFactory;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;
private final long topicPartitionCount;
Expand All @@ -44,11 +45,13 @@ public class PslContinuousReader implements ContinuousReader {
public PslContinuousReader(
CursorClient cursorClient,
MultiPartitionCommitter committer,
PartitionSubscriberFactory partitionSubscriberFactory,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
long topicPartitionCount) {
this.cursorClient = cursorClient;
this.committer = committer;
this.partitionSubscriberFactory = partitionSubscriberFactory;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.topicPartitionCount = topicPartitionCount;
Expand Down Expand Up @@ -104,10 +107,12 @@ public StructType readSchema() {

@Override
public List<InputPartition<InternalRow>> planInputPartitions() {

return startOffset.getPartitionOffsetMap().values().stream()
.map(
v ->
new PslContinuousInputPartition(
(consumer) -> partitionSubscriberFactory.newSubscriber(v.partition(), consumer),
SparkPartitionOffset.builder()
.partition(v.partition())
.offset(v.offset())
Expand Down
Loading