From 431801d876fd8ff4392bf4432a01d5529e409b9c Mon Sep 17 00:00:00 2001 From: dpcollins-google Date: Wed, 20 May 2020 10:36:20 -0400 Subject: [PATCH] fix: Propogate context in SinglePartitionPublisherBuilder. --- .../internal/wire/SinglePartitionPublisherBuilder.java | 5 ++++- .../com/google/cloud/pubsublite/beam/PubsubLiteSink.java | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java index 54c5029b2..5fbd3b1c4 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java @@ -64,7 +64,10 @@ public abstract static class Builder { public SinglePartitionPublisher build() throws StatusException { SinglePartitionPublisherBuilder builder = autoBuild(); PublisherBuilder.Builder publisherBuilder = - PublisherBuilder.builder().setTopic(builder.topic()).setPartition(builder.partition()); + PublisherBuilder.builder() + .setTopic(builder.topic()) + .setPartition(builder.partition()) + .setContext(builder.context()); builder.stub().ifPresent(publisherBuilder::setStub); builder.batchingSettings().ifPresent(publisherBuilder::setBatching); return new SinglePartitionPublisher(publisherBuilder.build(), builder.partition()); diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java index 625f42e12..b11004a30 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteSink.java @@ -115,7 +115,8 @@ public void onFailure(Throwable t) { errorsSinceLastFinish.push(ExtractStatus.toCanonical(t)); } } - }, executor); + }, + executor); } // Intentionally don't flush on bundle finish to allow multi-sink client reuse.