Skip to content

fix: Set batching settings on SinglePartitionPublisher#563

Merged
anguillanneuf merged 1 commit into
masterfrom
fix-publish-beam
Mar 29, 2021
Merged

fix: Set batching settings on SinglePartitionPublisher#563
anguillanneuf merged 1 commit into
masterfrom
fix-publish-beam

Conversation

@dpcollins-google
Copy link
Copy Markdown
Contributor

@dpcollins-google dpcollins-google commented Mar 29, 2021

Writing to a Lite topic will work with this fix.
But a pipeline that writes a bounded source to a Lite topic doesn't exit.

@dpcollins-google dpcollins-google requested a review from a team March 29, 2021 19:50
@product-auto-label product-auto-label Bot added the api: pubsublite Issues related to the googleapis/java-pubsublite API. label Mar 29, 2021
@google-cla google-cla Bot added the cla: yes This human has signed the Contributor License Agreement. label Mar 29, 2021
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 29, 2021

Codecov Report

Merging #563 (00c6444) into master (343afbf) will decrease coverage by 0.01%.
The diff coverage is 0.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #563      +/-   ##
============================================
- Coverage     72.63%   72.62%   -0.02%     
  Complexity      861      861              
============================================
  Files           156      156              
  Lines          4488     4489       +1     
  Branches        211      211              
============================================
  Hits           3260     3260              
- Misses         1107     1108       +1     
  Partials        121      121              
Impacted Files Coverage Δ Complexity Δ
...a/com/google/cloud/pubsublite/beam/Publishers.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 343afbf...00c6444. Read the comment docs.

Copy link
Copy Markdown
Contributor

@anguillanneuf anguillanneuf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested with a simple example like below and it worked, but the pipeline doesn't exit with a bounded source. @dpcollins-google is aware of it and look into it a bit later.

import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.proto.PubSubMessage;
// ...
    pipeline
        .apply(Create.of("Hello!", "Hello again"))
        .apply(ParDo.of(new DoFn<String, PubSubMessage>() {
          @ProcessElement
          public void processElement(ProcessContext context) {
            ImmutableListMultimap<String, ByteString> multimap =
                ImmutableListMultimap.of("Year", ByteString.copyFromUtf8("2021"));
            Message message = Message.builder()
                .setData(ByteString.copyFromUtf8(context.element()))
                .setAttributes(multimap)
                //.setEventTime(fromMillis(System.currentTimeMillis()))
                .build();
            LOG.info(Instant.now() + "\t" + message);
            context.output(message.toProto());
          }
        }))
        .apply("Write to Pub/Sub Lite", PubsubLiteIO.write(publisherOptions));

@anguillanneuf anguillanneuf merged commit 3b099d0 into master Mar 29, 2021
@anguillanneuf anguillanneuf deleted the fix-publish-beam branch March 29, 2021 21:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsublite Issues related to the googleapis/java-pubsublite API. cla: yes This human has signed the Contributor License Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants