diff --git a/.gitattributes b/.gitattributes
deleted file mode 100644
index c39158cf00..0000000000
--- a/.gitattributes
+++ /dev/null
@@ -1,40 +0,0 @@
-# Copyright (C) 2017 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-# The default behavior, which overrides 'core.autocrlf', is to use Git's
-# built-in heuristics to determine whether a particular file is text or binary.
-# Text files are automatically normalized to the user's platforms.
-* text=auto
-
-# Explicitly declare text files that should always be normalized and converted
-# to native line endings.
-.gitattributes text
-.gitignore text
-LICENSE text
-*.avsc text
-*.html text
-*.java text
-*.md text
-*.properties text
-*.proto text
-*.py text
-*.sh text
-*.xml text
-*.yml text
-
-# Declare files that will always have CRLF line endings on checkout.
-# *.sln text eol=crlf
-
-# Explicitly denote all files that are truly binary and should not be modified.
-# *.jpg binary
diff --git a/.gitignore b/.gitignore
deleted file mode 100644
index 2a27023c28..0000000000
--- a/.gitignore
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright (C) 2017 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-target/
-
-# Ignore IntelliJ files.
-.idea/
-*.iml
-*.ipr
-*.iws
-
-# Ignore Eclipse files.
-.classpath
-.project
-.settings/
-
-# The build process generates the dependency-reduced POM, but it shouldn't be
-# committed.
-dependency-reduced-pom.xml
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 8fa5d9a932..0000000000
--- a/.travis.yml
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright (C) 2017 Google Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-language: java
-
-sudo: false
-
-notifications:
- email:
- # Group email notifications are disabled for now, since we cannot do it on a per-branch basis.
- # Right now, it would trigger a notification for each fork, which generates a lot of spam.
- # recipients:
- # - dataflow-sdk-build-notifications+travis@google.com
- on_success: change
- on_failure: always
-
-matrix:
- include:
- # On OSX, run with default JDK only.
- - os: osx
- # On Linux, run with specific JDKs only.
- - os: linux
- env: CUSTOM_JDK="oraclejdk8"
- # The distribution does not build with Java 7 by design. We need to rewrite these tests
- # to, for example, build and install with Java 8 and then test examples with Java 7.
- # - os: linux
- # env: CUSTOM_JDK="oraclejdk7"
- # - os: linux
- # env: CUSTOM_JDK="openjdk7"
-
-before_install:
- - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
- - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
-
-install:
- - travis_retry mvn install clean -U -DskipTests=true
-
-script:
- # Verify that the project can be built and installed.
- - mvn install
- # Verify that starter and examples archetypes have the correct version of the NOTICE file.
- - diff -q NOTICE maven-archetypes/starter/src/main/resources/NOTICE
- - diff -q NOTICE maven-archetypes/examples/src/main/resources/NOTICE
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
deleted file mode 100644
index 9b616e5fe3..0000000000
--- a/CONTRIBUTING.md
+++ /dev/null
@@ -1,51 +0,0 @@
-
-
-Want to contribute? Great! First, read this page (including the small print at
-the end).
-
-Google Cloud Dataflow SDK is a distribution of Apache Beam. If you'd like to
-change anything under the `org.apache.beam.*` namespace, please submit that
-change directly to the [Apache Beam](https://github.com/apache/beam) project.
-
-This repository contains code to build the Dataflow distribution of Beam, and
-some Dataflow-specific code. Only changes to how the distribution is built, or
-the Dataflow-specific code under the `com.google.cloud.dataflow.*` namespace,
-can be merged here.
-
-### Before you contribute
-Before we can use your code, you must sign the
-[Google Individual Contributor License Agreement](https://developers.google.com/open-source/cla/individual?csw=1)
-(CLA), which you can do online. The CLA is necessary mainly because you own the
-copyright to your changes, even after your contribution becomes part of our
-codebase, so we need your permission to use and distribute your code. We also
-need to be sure of various other things. For instance that you'll tell us if you
-know that your code infringes on other people's patents. You don't have to sign
-the CLA until after you've submitted your code for review and a member has
-approved it, but you must do it before we can put your code into our codebase.
-
-Before you start working on a larger contribution, we recommend to get in touch
-with us first through the issue tracker with your idea so that we can help out
-and possibly guide you. Coordinating up front makes it much easier to avoid
-frustration later on.
-
-### Code reviews
-All submissions, including submissions by project members, require review. We
-use GitHub pull requests for this purpose.
-
-### The small print
-Contributions made by corporations are covered by a different agreement than
-the one above, the Software Grant and Corporate Contributor License Agreement.
diff --git a/LICENSE b/LICENSE
deleted file mode 100644
index d645695673..0000000000
--- a/LICENSE
+++ /dev/null
@@ -1,202 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
diff --git a/NOTICE b/NOTICE
deleted file mode 100644
index 981fde5a9e..0000000000
--- a/NOTICE
+++ /dev/null
@@ -1,5 +0,0 @@
-Google Cloud Dataflow SDK for Java
-Copyright 2017, Google Inc.
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 112df59d01..dfb630ad79 100644
--- a/README.md
+++ b/README.md
@@ -16,86 +16,28 @@
# Google Cloud Dataflow SDK for Java
-[Google Cloud Dataflow](https://cloud.google.com/dataflow/) provides a simple,
-powerful programming model for building both batch and streaming parallel data
-processing pipelines.
+[Google Cloud Dataflow](https://cloud.google.com/dataflow/) is a service for executing [Apache Beam](https://beam.apache.org) pipelines on Google Cloud Platform.
-Dataflow SDK for Java is a distribution of a portion of the
-[Apache Beam](https://beam.apache.org) project. This repository hosts the
-code to build this distribution and any Dataflow-specific code/modules. The
-underlying source code is hosted in the
-[Apache Beam repository](https://github.com/apache/beam).
-
-[General usage](https://cloud.google.com/dataflow/getting-started) of Google
-Cloud Dataflow does **not** require use of this repository. Instead, you can do
-any one of the following:
-
-1. Depend directly on a specific
-[version](https://cloud.google.com/dataflow/downloads) of the SDK in
-the [Maven Central Repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.google.cloud.dataflow%22)
-by adding the following dependency to development
-environments like Eclipse or Apache Maven:
-
-
- com.google.cloud.dataflow
- google-cloud-dataflow-java-sdk-all
- version_number
-
-
-1. Download the example pipelines from the separate
-[DataflowJavaSDK-examples](https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples)
-repository.
-
-1. If you are using [Eclipse](https://eclipse.org/) integrated development
-environment (IDE), the
-[Cloud Dataflow Plugin for Eclipse](https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-eclipse)
-provides tools to create and execute Dataflow pipelines inside Eclipse.
-
-## Status [](https://travis-ci.org/GoogleCloudPlatform/DataflowJavaSDK)
-
-Both the SDK and the Dataflow Service are generally available and considered
-stable and fully qualified for production use.
-
-This [`master`](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/) branch
-contains code to build Dataflow SDK 2.0.0 and newer, as a distribution of Apache
-Beam. Pre-Beam SDKs, versions 1.x, are maintained in the
-[`master-1.x`](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/master-1.x)
-branch.
-
-## Overview
-
-The key concepts in this programming model are:
-
-* `PCollection`: represents a collection of data, which could be bounded or
-unbounded in size.
-* `PTransform`: represents a computation that transforms input PCollections
-into output PCollections.
-* `Pipeline`: manages a directed acyclic graph of PTransforms and PCollections
-that is ready for execution.
-* `PipelineRunner`: specifies where and how the pipeline should execute.
-
-We provide two runners:
-
- 1. The `DirectRunner` runs the pipeline on your local machine.
- 1. The `DataflowRunner` submits the pipeline to the Cloud Dataflow Service,
-where it runs using managed resources in the
-[Google Cloud Platform](https://cloud.google.com).
+## Getting Started
-The SDK is built to be extensible and support additional execution environments
-beyond local execution and the Google Cloud Dataflow Service. Apache Beam
-contains additional SDKs, runners, and IO connectors.
+* [Quickstart Using Java](https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven) on Google Cloud Dataflow
+* [Java API Reference](https://beam.apache.org/documentation/sdks/javadoc/)
+* [Java Examples](https://github.com/apache/beam/tree/master/examples/java)
-## Getting Started
+## We moved to Apache Beam!
+Apache Beam Java SDK and the code development moved to the [Apache Beam repo](https://github.com/apache/beam/tree/master/sdks/java).
-Please try our [Quickstarts](https://cloud.google.com/dataflow/docs/quickstarts).
+If you want to contribute to the project (please do!) use this [Apache Beam contributor's guide](http://beam.apache.org/contribution-guide/)
## Contact Us
-We welcome all usage-related questions on [Stack Overflow](http://stackoverflow.com/questions/tagged/google-cloud-dataflow)
+We welcome all usage-related questions on
+[Stack Overflow](https://stackoverflow.com/questions/tagged/google-cloud-dataflow)
tagged with `google-cloud-dataflow`.
-Please use [issue tracker](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues)
-on GitHub to report any bugs, comments or questions regarding SDK development.
+Please use the
+[issue tracker](https://issues.apache.org/jira/browse/BEAM)
+on Apache JIRA to report any bugs, comments or questions regarding SDK development.
## More Information
diff --git a/examples/pom.xml b/examples/pom.xml
deleted file mode 100644
index f87ae36b1d..0000000000
--- a/examples/pom.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-
-
-
- 4.0.0
-
-
- com.google.cloud.dataflow
- google-cloud-dataflow-java-sdk-parent
- 2.2.0-SNAPSHOT
-
-
- google-cloud-dataflow-java-examples-all
- Google Cloud Dataflow Java Examples - All
- Google Cloud Dataflow SDK for Java is a distribution of Apache
- Beam designed to simplify usage of Apache Beam on Google Cloud Dataflow
- service. This artifact includes all Dataflow Java SDK
- examples.
-
- jar
-
-
-
- com.google.cloud.dataflow
- google-cloud-dataflow-java-sdk-all
-
-
-
- org.apache.beam
- beam-examples-java
-
-
-
- org.apache.beam
- beam-examples-java8
-
-
-
diff --git a/examples/src/main/java/com/google/cloud/dataflow/sdk/ExamplesDependencies.java b/examples/src/main/java/com/google/cloud/dataflow/sdk/ExamplesDependencies.java
deleted file mode 100644
index 827aff8395..0000000000
--- a/examples/src/main/java/com/google/cloud/dataflow/sdk/ExamplesDependencies.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright (C) 2017 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk;
-
-import org.apache.beam.examples.MinimalWordCountJava8;
-import org.apache.beam.examples.WordCount;
-
-/**
- * Mark the examples dependencies as used at compile time. This is also needed
- * to produce some content in the final JAR file.
- */
-class ExamplesDependencies {
- SdkDependencies sdkDependencies;
- WordCount wordCount;
- MinimalWordCountJava8 minimalWordCount;
-}
diff --git a/maven-archetypes/examples-java8/pom.xml b/maven-archetypes/examples-java8/pom.xml
deleted file mode 100644
index 463c66f1d1..0000000000
--- a/maven-archetypes/examples-java8/pom.xml
+++ /dev/null
@@ -1,80 +0,0 @@
-
-
-
-
- 4.0.0
-
-
- com.google.cloud.dataflow
- google-cloud-dataflow-java-archetypes-parent
- 2.2.0-SNAPSHOT
- ../pom.xml
-
-
- google-cloud-dataflow-java-archetypes-examples-java8
- Google Cloud Dataflow SDK for Java - Java 8 Examples Archetype
- Google Cloud Dataflow SDK for Java is a distribution of Apache
- Beam designed to simplify usage of Apache Beam on Google Cloud Dataflow
- service. This archetype creates a project containing all the example
- pipelines targeting Java 8.
-
- maven-archetype
-
-
-
-
- org.apache.maven.archetype
- archetype-packaging
- 2.4
-
-
-
-
-
-
- maven-archetype-plugin
- 2.4
-
-
- org.apache.maven.shared
- maven-invoker
- 2.2
-
-
-
-
-
- default-integration-test
- install
-
- integration-test
-
-
-
-
-
-
-
-
-
-
diff --git a/maven-archetypes/examples-java8/src/main/resources/META-INF/maven/archetype-metadata.xml b/maven-archetypes/examples-java8/src/main/resources/META-INF/maven/archetype-metadata.xml
deleted file mode 100644
index 326fdaa528..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-
-
-
-
-
-
- src/main/java
-
- **/*.java
-
-
-
-
- src/test/java
-
- **/*.java
-
-
-
-
diff --git a/maven-archetypes/examples-java8/src/main/resources/NOTICE b/maven-archetypes/examples-java8/src/main/resources/NOTICE
deleted file mode 100644
index 981fde5a9e..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/NOTICE
+++ /dev/null
@@ -1,5 +0,0 @@
-Google Cloud Dataflow SDK for Java
-Copyright 2017, Google Inc.
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
deleted file mode 100644
index f33914d476..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml
+++ /dev/null
@@ -1,248 +0,0 @@
-
-
-
- 4.0.0
-
- ${groupId}
- ${artifactId}
- ${version}
-
- jar
-
-
- UTF-8
- 2.20
-
-
-
-
- ossrh.snapshots
- Sonatype OSS Repository Hosting
- https://oss.sonatype.org/content/repositories/snapshots/
-
- false
-
-
- true
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
- 1.8
- 1.8
-
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
- ${surefire-plugin.version}
-
- all
- 4
- true
-
-
-
- org.apache.maven.surefire
- surefire-junit47
- ${surefire-plugin.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
- 3.0.2
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.0.0
-
-
- package
-
- shade
-
-
- ${project.artifactId}-bundled-${project.version}
-
-
- *:*
-
- META-INF/LICENSE
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- org.codehaus.mojo
- exec-maven-plugin
- 1.5.0
-
- false
-
-
-
-
-
-
-
-
-
- com.google.cloud.dataflow
- google-cloud-dataflow-java-sdk-all
- @project.version@
-
-
-
-
- com.google.api-client
- google-api-client
- 1.22.0
-
-
-
- com.google.guava
- guava-jdk5
-
-
-
-
-
- com.google.apis
- google-api-services-bigquery
- v2-rev295-1.22.0
-
-
-
- com.google.guava
- guava-jdk5
-
-
-
-
-
- com.google.http-client
- google-http-client
- 1.22.0
-
-
-
- com.google.guava
- guava-jdk5
-
-
-
-
-
- com.google.apis
- google-api-services-pubsub
- v1-rev10-1.22.0
-
-
-
- com.google.guava
- guava-jdk5
-
-
-
-
-
- joda-time
- joda-time
- 2.4
-
-
-
- com.google.guava
- guava
- 20.0
-
-
-
-
- org.slf4j
- slf4j-api
- 1.7.14
-
-
-
- org.slf4j
- slf4j-jdk14
- 1.7.14
-
- runtime
-
-
-
-
- org.hamcrest
- hamcrest-all
- 1.3
-
-
-
- junit
- junit
- 4.12
-
-
-
- org.mockito
- mockito-all
- 1.9.5
- test
-
-
-
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
deleted file mode 100644
index 07870f2ed0..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package};
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Pattern;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * An example that verifies word counts in Shakespeare and includes Beam best practices.
- *
- *
This class, {@link DebuggingWordCount}, is the third in a series of four successively more
- * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount}
- * and {@link WordCount}. After you've looked at this example, then see the
- * {@link WindowedWordCount} pipeline, for introduction of additional concepts.
- *
- *
Basic concepts, also in the MinimalWordCount and WordCount examples:
- * Reading text files; counting a PCollection; executing a Pipeline both locally
- * and using a selected runner; defining DoFns.
- *
- *
New Concepts:
- *
- * 1. Logging using SLF4J, even in a distributed environment
- * 2. Creating a custom metric (runners have varying levels of support)
- * 3. Testing your Pipeline via PAssert
- *
- *
- *
To execute this pipeline locally, specify general pipeline configuration:
- *
The input file defaults to a public data set containing the text of of King Lear,
- * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
- *
- */
-public class DebuggingWordCount {
- /** A DoFn that filters for a specific key based upon a regular expression. */
- public static class FilterTextFn extends DoFn, KV> {
- /**
- * Concept #1: The logger below uses the fully qualified class name of FilterTextFn as the
- * logger. Depending on your SLF4J configuration, log statements will likely be qualified by
- * this name.
- *
- *
Note that this is entirely standard SLF4J usage. Some runners may provide a default SLF4J
- * configuration that is most appropriate for their logging integration.
- */
- private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class);
-
- private final Pattern filter;
- public FilterTextFn(String pattern) {
- filter = Pattern.compile(pattern);
- }
-
- /**
- * Concept #2: A custom metric can track values in your pipeline as it runs. Each
- * runner provides varying levels of support for metrics, and may expose them
- * in a dashboard, etc.
- */
- private final Counter matchedWords = Metrics.counter(FilterTextFn.class, "matchedWords");
- private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords");
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (filter.matcher(c.element().getKey()).matches()) {
- // Log at the "DEBUG" level each element that we match. When executing this pipeline
- // these log lines will appear only if the log level is set to "DEBUG" or lower.
- LOG.debug("Matched: " + c.element().getKey());
- matchedWords.inc();
- c.output(c.element());
- } else {
- // Log at the "TRACE" level each element that is not matched. Different log levels
- // can be used to control the verbosity of logging providing an effective mechanism
- // to filter less important information.
- LOG.trace("Did not match: " + c.element().getKey());
- unmatchedWords.inc();
- }
- }
- }
-
- /**
- * Options supported by {@link DebuggingWordCount}.
- *
- *
Inherits standard configuration options and all options defined in
- * {@link WordCount.WordCountOptions}.
- */
- public interface WordCountOptions extends WordCount.WordCountOptions {
-
- @Description("Regex filter pattern to use in DebuggingWordCount. "
- + "Only words matching this pattern will be counted.")
- @Default.String("Flourish|stomach")
- String getFilterPattern();
- void setFilterPattern(String value);
- }
-
- public static void main(String[] args) {
- WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
- .as(WordCountOptions.class);
- Pipeline p = Pipeline.create(options);
-
- PCollection> filteredWords =
- p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
- .apply(new WordCount.CountWords())
- .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
-
- /**
- * Concept #3: PAssert is a set of convenient PTransforms in the style of
- * Hamcrest's collection matchers that can be used when writing Pipeline level tests
- * to validate the contents of PCollections. PAssert is best used in unit tests
- * with small data sets but is demonstrated here as a teaching tool.
- *
- *
Below we verify that the set of filtered words matches our expected counts. Note
- * that PAssert does not provide any output and that successful completion of the
- * Pipeline implies that the expectations were met. Learn more at
- * https://beam.apache.org/documentation/pipelines/test-your-pipeline/ on how to test
- * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test.
- */
- List> expectedResults = Arrays.asList(
- KV.of("Flourish", 3L),
- KV.of("stomach", 1L));
- PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
-
- p.run().waitUntilFinish();
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
deleted file mode 100644
index d6b08066db..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package};
-
-import ${package}.common.ExampleUtils;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.KV;
-
-
-/**
- * An example that counts words in Shakespeare.
- *
- *
This class, {@link MinimalWordCount}, is the first in a series of four successively more
- * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or
- * argument processing, and focus on construction of the pipeline, which chains together the
- * application of core transforms.
- *
- *
Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the
- * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
- * concepts.
- *
- *
Concepts:
- *
- *
- * 1. Reading data from text files
- * 2. Specifying 'inline' transforms
- * 3. Counting items in a PCollection
- * 4. Writing data to text files
- *
- *
- *
No arguments are required to run this pipeline. It will be executed with the DirectRunner. You
- * can see the results in the output files in your current working directory, with names like
- * "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate
- * file service.
- */
-public class MinimalWordCount {
-
- public static void main(String[] args) {
- // Create a PipelineOptions object. This object lets us set various execution
- // options for our pipeline, such as the runner you wish to use. This example
- // will run with the DirectRunner by default, based on the class path configured
- // in its dependencies.
- PipelineOptions options = PipelineOptionsFactory.create();
-
- // Create the Pipeline object with the options we defined above.
- Pipeline p = Pipeline.create(options);
-
- // Apply the pipeline's transforms.
-
- // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
- // of input text files. TextIO.Read returns a PCollection where each element is one line from
- // the input text (a set of Shakespeare's texts).
-
- // This example reads a public data set consisting of the complete works of Shakespeare.
- p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
-
- // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
- // DoFn (defined in-line) on each element that tokenizes the text line into individual words.
- // The ParDo returns a PCollection, where each element is an individual word in
- // Shakespeare's collected texts.
- .apply("ExtractWords", ParDo.of(new DoFn() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }))
-
- // Concept #3: Apply the Count transform to our PCollection of individual words. The Count
- // transform returns a new PCollection of key/value pairs, where each key represents a unique
- // word in the text. The associated value is the occurrence count for that word.
- .apply(Count.perElement())
-
- // Apply a MapElements transform that formats our PCollection of word counts into a printable
- // string, suitable for writing to an output file.
- .apply("FormatResults", MapElements.via(new SimpleFunction, String>() {
- @Override
- public String apply(KV input) {
- return input.getKey() + ": " + input.getValue();
- }
- }))
-
- // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
- // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of
- // formatted strings) to a series of text files.
- //
- // By default, it will write to a set of files with names like wordcount-00001-of-00005
- .apply(TextIO.write().to("wordcounts"));
-
- // Run the pipeline.
- p.run().waitUntilFinish();
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/MinimalWordCountJava8.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/MinimalWordCountJava8.java
deleted file mode 100644
index e635a885b7..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/MinimalWordCountJava8.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package};
-
-import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.FlatMapElements;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TypeDescriptors;
-
-/**
- * An example that counts words in Shakespeare, using Java 8 language features.
- *
- *
See {@link MinimalWordCount} for a comprehensive explanation.
- */
-public class MinimalWordCountJava8 {
-
- public static void main(String[] args) {
- PipelineOptions options = PipelineOptionsFactory.create();
- // In order to run your pipeline, you need to make following runner specific changes:
- //
- // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
- // or FlinkRunner.
- // CHANGE 2/3: Specify runner-required options.
- // For BlockingDataflowRunner, set project and temp location as follows:
- // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
- // dataflowOptions.setRunner(BlockingDataflowRunner.class);
- // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
- // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
- // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
- // for more details.
- // options.as(FlinkPipelineOptions.class)
- // .setRunner(FlinkRunner.class);
-
- Pipeline p = Pipeline.create(options);
-
- p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
- .apply(FlatMapElements
- .into(TypeDescriptors.strings())
- .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
- .apply(Filter.by((String word) -> !word.isEmpty()))
- .apply(Count.perElement())
- .apply(MapElements
- .into(TypeDescriptors.strings())
- .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
- // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
- .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
-
- p.run().waitUntilFinish();
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
deleted file mode 100644
index 6a1d07c485..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package};
-
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
-import ${package}.common.ExampleBigQueryTableOptions;
-import ${package}.common.ExampleOptions;
-import ${package}.common.WriteOneFilePerWindow;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-
-/**
- * An example that counts words in text, and can run over either unbounded or bounded input
- * collections.
- *
- *
This class, {@link WindowedWordCount}, is the last in a series of four successively more
- * detailed 'word count' examples. First take a look at {@link MinimalWordCount},
- * {@link WordCount}, and {@link DebuggingWordCount}.
- *
- *
Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples:
- * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally
- * and using a selected runner; defining DoFns;
- * user-defined PTransforms; defining PipelineOptions.
- *
- *
New Concepts:
- *
- * 1. Unbounded and bounded pipeline input modes
- * 2. Adding timestamps to data
- * 3. Windowing
- * 4. Re-using PTransforms over windowed PCollections
- * 5. Accessing the window of an element
- * 6. Writing data to per-window text files
- *
- *
- *
By default, the examples will run with the {@code DirectRunner}.
- * To change the runner, specify:
- *
- * See examples/java/README.md for instructions about how to configure different runners.
- *
- *
To execute this pipeline locally, specify a local output file (if using the
- * {@code DirectRunner}) or output prefix on a supported distributed file system.
- *
The input file defaults to a public data set containing the text of of King Lear,
- * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
- *
- *
By default, the pipeline will do fixed windowing, on 1-minute windows. You can
- * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10}
- * for 10-minute windows.
- *
- *
The example will try to cancel the pipeline on the signal to terminate the process (CTRL-C).
- */
-public class WindowedWordCount {
- static final int WINDOW_SIZE = 10; // Default window duration in minutes
- /**
- * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for
- * this example, for the bounded data case.
- *
- *
Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate
- * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
- * 2-hour period.
- */
- static class AddTimestampFn extends DoFn {
- private static final Duration RAND_RANGE = Duration.standardHours(1);
- private final Instant minTimestamp;
- private final Instant maxTimestamp;
-
- AddTimestampFn(Instant minTimestamp, Instant maxTimestamp) {
- this.minTimestamp = minTimestamp;
- this.maxTimestamp = maxTimestamp;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- Instant randomTimestamp =
- new Instant(
- ThreadLocalRandom.current()
- .nextLong(minTimestamp.getMillis(), maxTimestamp.getMillis()));
-
- /**
- * Concept #2: Set the data element with that timestamp.
- */
- c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
- }
- }
-
- /** A {@link DefaultValueFactory} that returns the current system time. */
- public static class DefaultToCurrentSystemTime implements DefaultValueFactory {
- @Override
- public Long create(PipelineOptions options) {
- return System.currentTimeMillis();
- }
- }
-
- /** A {@link DefaultValueFactory} that returns the minimum timestamp plus one hour. */
- public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory {
- @Override
- public Long create(PipelineOptions options) {
- return options.as(Options.class).getMinTimestampMillis()
- + Duration.standardHours(1).getMillis();
- }
- }
-
- /**
- * Options for {@link WindowedWordCount}.
- *
- *
Inherits standard example configuration options, which allow specification of the
- * runner, as well as the {@link WordCount.WordCountOptions} support for
- * specification of the input and output files.
- */
- public interface Options extends WordCount.WordCountOptions,
- ExampleOptions, ExampleBigQueryTableOptions {
- @Description("Fixed window duration, in minutes")
- @Default.Integer(WINDOW_SIZE)
- Integer getWindowSize();
- void setWindowSize(Integer value);
-
- @Description("Minimum randomly assigned timestamp, in milliseconds-since-epoch")
- @Default.InstanceFactory(DefaultToCurrentSystemTime.class)
- Long getMinTimestampMillis();
- void setMinTimestampMillis(Long value);
-
- @Description("Maximum randomly assigned timestamp, in milliseconds-since-epoch")
- @Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class)
- Long getMaxTimestampMillis();
- void setMaxTimestampMillis(Long value);
-
- @Description("Fixed number of shards to produce per window, or null for runner-chosen sharding")
- Integer getNumShards();
- void setNumShards(Integer numShards);
- }
-
- public static void main(String[] args) throws IOException {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- final String output = options.getOutput();
- final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
- final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
-
- Pipeline pipeline = Pipeline.create(options);
-
- /**
- * Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or
- * unbounded input source.
- */
- PCollection input = pipeline
- /** Read from the GCS file. */
- .apply(TextIO.read().from(options.getInputFile()))
- // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
- // See AddTimestampFn for more detail on this.
- .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
-
- /**
- * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1
- * minute (you can change this with a command-line option). See the documentation for more
- * information on how fixed windows work, and for information on the other types of windowing
- * available (e.g., sliding windows).
- */
- PCollection windowedWords =
- input.apply(
- Window.into(
- FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
-
- /**
- * Concept #4: Re-use our existing CountWords transform that does not have knowledge of
- * windows over a PCollection containing windowed values.
- */
- PCollection> wordCounts = windowedWords.apply(new WordCount.CountWords());
-
- /**
- * Concept #5: Format the results and write to a sharded file partitioned by window, using a
- * simple ParDo operation. Because there may be failures followed by retries, the
- * writes must be idempotent, but the details of writing to files is elided here.
- */
- wordCounts
- .apply(MapElements.via(new WordCount.FormatAsTextFn()))
- .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
-
- PipelineResult result = pipeline.run();
- try {
- result.waitUntilFinish();
- } catch (Exception exc) {
- result.cancel();
- }
- }
-
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/WordCount.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/WordCount.java
deleted file mode 100644
index 79b71403b9..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package};
-
-import ${package}.common.ExampleUtils;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * An example that counts words in Shakespeare and includes Beam best practices.
- *
- *
This class, {@link WordCount}, is the second in a series of four successively more detailed
- * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}.
- * After you've looked at this example, then see the {@link DebuggingWordCount}
- * pipeline, for introduction of additional concepts.
- *
- *
Basic concepts, also in the MinimalWordCount example:
- * Reading text files; counting a PCollection; writing to text files
- *
- *
New Concepts:
- *
- * 1. Executing a Pipeline both locally and using the selected runner
- * 2. Using ParDo with static DoFns defined out-of-line
- * 3. Building a composite transform
- * 4. Defining your own pipeline options
- *
- *
- *
Concept #1: you can execute this pipeline either locally or using by selecting another runner.
- * These are now command-line options and not hard-coded as they were in the MinimalWordCount
- * example.
- *
- *
To execute this pipeline, specify a local output file (if using the
- * {@code DirectRunner}) or output prefix on a supported distributed file system.
- *
The input file defaults to a public data set containing the text of of King Lear,
- * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}.
- */
-public class WordCount {
-
- /**
- * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
- * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it
- * to a ParDo in the pipeline.
- */
- static class ExtractWordsFn extends DoFn {
- private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.inc();
- }
-
- // Split the line into words.
- String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN);
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /** A SimpleFunction that converts a Word and Count into a printable string. */
- public static class FormatAsTextFn extends SimpleFunction, String> {
- @Override
- public String apply(KV input) {
- return input.getKey() + ": " + input.getValue();
- }
- }
-
- /**
- * A PTransform that converts a PCollection containing lines of text into a PCollection of
- * formatted word counts.
- *
- *
Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
- * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
- * modular testing, and an improved monitoring experience.
- */
- public static class CountWords extends PTransform,
- PCollection>> {
- @Override
- public PCollection> expand(PCollection lines) {
-
- // Convert lines of text into individual words.
- PCollection words = lines.apply(
- ParDo.of(new ExtractWordsFn()));
-
- // Count the number of times each word occurs.
- PCollection> wordCounts =
- words.apply(Count.perElement());
-
- return wordCounts;
- }
- }
-
- /**
- * Options supported by {@link WordCount}.
- *
- *
Concept #4: Defining your own configuration options. Here, you can add your own arguments
- * to be processed by the command-line parser, and specify default values for them. You can then
- * access the options values in your pipeline code.
- *
- *
Inherits standard configuration options.
- */
- public interface WordCountOptions extends PipelineOptions {
-
- /**
- * By default, this example reads from a public dataset containing the text of
- * King Lear. Set this option to choose a different input file or glob.
- */
- @Description("Path of the file to read from")
- @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
- String getInputFile();
- void setInputFile(String value);
-
- /**
- * Set this required option to specify where to write the output.
- */
- @Description("Path of the file to write to")
- @Required
- String getOutput();
- void setOutput(String value);
- }
-
- public static void main(String[] args) {
- WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
- .as(WordCountOptions.class);
- Pipeline p = Pipeline.create(options);
-
- // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
- // static FormatAsTextFn() to the ParDo transform.
- p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
- .apply(new CountWords())
- .apply(MapElements.via(new FormatAsTextFn()))
- .apply("WriteCounts", TextIO.write().to(options.getOutput()));
-
- p.run().waitUntilFinish();
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
deleted file mode 100644
index 57f1546e27..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import com.google.api.services.bigquery.model.TableSchema;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Options that can be used to configure BigQuery tables in Beam examples.
- * The project defaults to the project being used to run the example.
- */
-public interface ExampleBigQueryTableOptions extends GcpOptions {
- @Description("BigQuery dataset name")
- @Default.String("beam_examples")
- String getBigQueryDataset();
- void setBigQueryDataset(String dataset);
-
- @Description("BigQuery table name")
- @Default.InstanceFactory(BigQueryTableFactory.class)
- String getBigQueryTable();
- void setBigQueryTable(String table);
-
- @Description("BigQuery table schema")
- TableSchema getBigQuerySchema();
- void setBigQuerySchema(TableSchema schema);
-
- /**
- * Returns the job name as the default BigQuery table name.
- */
- class BigQueryTableFactory implements DefaultValueFactory {
- @Override
- public String create(PipelineOptions options) {
- return options.getJobName().replace('-', '_');
- }
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
deleted file mode 100644
index 90f935c3ce..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Options that can be used to configure the Beam examples.
- */
-public interface ExampleOptions extends PipelineOptions {
- @Description("Whether to keep jobs running after local process exit")
- @Default.Boolean(false)
- boolean getKeepJobsRunning();
- void setKeepJobsRunning(boolean keepJobsRunning);
-
- @Description("Number of workers to use when executing the injector pipeline")
- @Default.Integer(1)
- int getInjectorNumWorkers();
- void setInjectorNumWorkers(int numWorkers);
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
deleted file mode 100644
index cf142a10fd..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Options that can be used to configure Pub/Sub topic/subscription in Beam examples.
- */
-public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions {
- @Description("Pub/Sub subscription")
- @Default.InstanceFactory(PubsubSubscriptionFactory.class)
- String getPubsubSubscription();
- void setPubsubSubscription(String subscription);
-
- /**
- * Returns a default Pub/Sub subscription based on the project and the job names.
- */
- class PubsubSubscriptionFactory implements DefaultValueFactory {
- @Override
- public String create(PipelineOptions options) {
- return "projects/" + options.as(GcpOptions.class).getProject()
- + "/subscriptions/" + options.getJobName();
- }
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
deleted file mode 100644
index 86784b06da..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * Options that can be used to configure Pub/Sub topic in Beam examples.
- */
-public interface ExamplePubsubTopicOptions extends GcpOptions {
- @Description("Pub/Sub topic")
- @Default.InstanceFactory(PubsubTopicFactory.class)
- String getPubsubTopic();
- void setPubsubTopic(String topic);
-
- /**
- * Returns a default Pub/Sub topic based on the project and the job names.
- */
- class PubsubTopicFactory implements DefaultValueFactory {
- @Override
- public String create(PipelineOptions options) {
- return "projects/" + options.as(GcpOptions.class).getProject()
- + "/topics/" + options.getJobName();
- }
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
deleted file mode 100644
index 78f3849b40..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.Bigquery.Datasets;
-import com.google.api.services.bigquery.Bigquery.Tables;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.DatasetReference;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
-import com.google.auth.Credentials;
-import com.google.auth.http.HttpCredentialsAdapter;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.BackOff;
-import org.apache.beam.sdk.util.BackOffUtils;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-import org.apache.beam.sdk.util.Sleeper;
-import org.apache.beam.sdk.util.Transport;
-import org.joda.time.Duration;
-
-/**
- * The utility class that sets up and tears down external resources,
- * and cancels the streaming pipelines once the program terminates.
- *
- *
It is used to run Beam examples.
- */
-public class ExampleUtils {
-
- private static final int SC_NOT_FOUND = 404;
-
- /**
- * \p{L} denotes the category of Unicode letters,
- * so this pattern will match on everything that is not a letter.
- *
- *
It is used for tokenizing strings in the wordcount examples.
- */
- public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
-
- private final PipelineOptions options;
- private Bigquery bigQueryClient = null;
- private Pubsub pubsubClient = null;
- private Set pipelinesToCancel = Sets.newHashSet();
- private List pendingMessages = Lists.newArrayList();
-
- /**
- * Do resources and runner options setup.
- */
- public ExampleUtils(PipelineOptions options) {
- this.options = options;
- }
-
- /**
- * Sets up external resources that are required by the example,
- * such as Pub/Sub topics and BigQuery tables.
- *
- * @throws IOException if there is a problem setting up the resources
- */
- public void setup() throws IOException {
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backOff =
- FluentBackoff.DEFAULT
- .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff();
- Throwable lastException = null;
- try {
- do {
- try {
- setupPubsub();
- setupBigQueryTable();
- return;
- } catch (GoogleJsonResponseException e) {
- lastException = e;
- }
- } while (BackOffUtils.next(sleeper, backOff));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // Ignore InterruptedException
- }
- throw new RuntimeException(lastException);
- }
-
- /**
- * Sets up the Google Cloud Pub/Sub topic.
- *
- *
If the topic doesn't exist, a new topic with the given name will be created.
- *
- * @throws IOException if there is a problem setting up the Pub/Sub topic
- */
- public void setupPubsub() throws IOException {
- ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
- options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
- if (!pubsubOptions.getPubsubTopic().isEmpty()) {
- pendingMessages.add("**********************Set Up Pubsub************************");
- setupPubsubTopic(pubsubOptions.getPubsubTopic());
- pendingMessages.add("The Pub/Sub topic has been set up for this example: "
- + pubsubOptions.getPubsubTopic());
-
- if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
- setupPubsubSubscription(
- pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription());
- pendingMessages.add("The Pub/Sub subscription has been set up for this example: "
- + pubsubOptions.getPubsubSubscription());
- }
- }
- }
-
- /**
- * Sets up the BigQuery table with the given schema.
- *
- *
If the table already exists, the schema has to match the given one. Otherwise, the example
- * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema
- * will be created.
- *
- * @throws IOException if there is a problem setting up the BigQuery table
- */
- public void setupBigQueryTable() throws IOException {
- ExampleBigQueryTableOptions bigQueryTableOptions =
- options.as(ExampleBigQueryTableOptions.class);
- if (bigQueryTableOptions.getBigQueryDataset() != null
- && bigQueryTableOptions.getBigQueryTable() != null
- && bigQueryTableOptions.getBigQuerySchema() != null) {
- pendingMessages.add("******************Set Up Big Query Table*******************");
- setupBigQueryTable(bigQueryTableOptions.getProject(),
- bigQueryTableOptions.getBigQueryDataset(),
- bigQueryTableOptions.getBigQueryTable(),
- bigQueryTableOptions.getBigQuerySchema());
- pendingMessages.add("The BigQuery table has been set up for this example: "
- + bigQueryTableOptions.getProject()
- + ":" + bigQueryTableOptions.getBigQueryDataset()
- + "." + bigQueryTableOptions.getBigQueryTable());
- }
- }
-
- /**
- * Tears down external resources that can be deleted upon the example's completion.
- */
- private void tearDown() {
- pendingMessages.add("*************************Tear Down*************************");
- ExamplePubsubTopicAndSubscriptionOptions pubsubOptions =
- options.as(ExamplePubsubTopicAndSubscriptionOptions.class);
- if (!pubsubOptions.getPubsubTopic().isEmpty()) {
- try {
- deletePubsubTopic(pubsubOptions.getPubsubTopic());
- pendingMessages.add("The Pub/Sub topic has been deleted: "
- + pubsubOptions.getPubsubTopic());
- } catch (IOException e) {
- pendingMessages.add("Failed to delete the Pub/Sub topic : "
- + pubsubOptions.getPubsubTopic());
- }
- if (!pubsubOptions.getPubsubSubscription().isEmpty()) {
- try {
- deletePubsubSubscription(pubsubOptions.getPubsubSubscription());
- pendingMessages.add("The Pub/Sub subscription has been deleted: "
- + pubsubOptions.getPubsubSubscription());
- } catch (IOException e) {
- pendingMessages.add("Failed to delete the Pub/Sub subscription : "
- + pubsubOptions.getPubsubSubscription());
- }
- }
- }
-
- ExampleBigQueryTableOptions bigQueryTableOptions =
- options.as(ExampleBigQueryTableOptions.class);
- if (bigQueryTableOptions.getBigQueryDataset() != null
- && bigQueryTableOptions.getBigQueryTable() != null
- && bigQueryTableOptions.getBigQuerySchema() != null) {
- pendingMessages.add("The BigQuery table might contain the example's output, "
- + "and it is not deleted automatically: "
- + bigQueryTableOptions.getProject()
- + ":" + bigQueryTableOptions.getBigQueryDataset()
- + "." + bigQueryTableOptions.getBigQueryTable());
- pendingMessages.add("Please go to the Developers Console to delete it manually."
- + " Otherwise, you may be charged for its usage.");
- }
- }
-
- /**
- * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
- */
- private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
- return new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
- * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
- */
- private static Pubsub.Builder newPubsubClient(PubsubOptions options) {
- return new Pubsub.Builder(Transport.getTransport(), Transport.getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setRootUrl(options.getPubsubRootUrl())
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- private static HttpRequestInitializer chainHttpRequestInitializer(
- Credentials credential, HttpRequestInitializer httpRequestInitializer) {
- if (credential == null) {
- return new ChainingHttpRequestInitializer(
- new NullCredentialInitializer(), httpRequestInitializer);
- } else {
- return new ChainingHttpRequestInitializer(
- new HttpCredentialsAdapter(credential),
- httpRequestInitializer);
- }
- }
-
- private void setupBigQueryTable(String projectId, String datasetId, String tableId,
- TableSchema schema) throws IOException {
- if (bigQueryClient == null) {
- bigQueryClient = newBigQueryClient(options.as(BigQueryOptions.class)).build();
- }
-
- Datasets datasetService = bigQueryClient.datasets();
- if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) {
- Dataset newDataset = new Dataset().setDatasetReference(
- new DatasetReference().setProjectId(projectId).setDatasetId(datasetId));
- datasetService.insert(projectId, newDataset).execute();
- }
-
- Tables tableService = bigQueryClient.tables();
- Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId));
- if (table == null) {
- Table newTable = new Table().setSchema(schema).setTableReference(
- new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId));
- tableService.insert(projectId, datasetId, newTable).execute();
- } else if (!table.getSchema().equals(schema)) {
- throw new RuntimeException(
- "Table exists and schemas do not match, expecting: " + schema.toPrettyString()
- + ", actual: " + table.getSchema().toPrettyString());
- }
- }
-
- private void setupPubsubTopic(String topic) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) {
- pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute();
- }
- }
-
- private void setupPubsubSubscription(String topic, String subscription) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) {
- Subscription subInfo = new Subscription()
- .setAckDeadlineSeconds(60)
- .setTopic(topic);
- pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
- }
- }
-
- /**
- * Deletes the Google Cloud Pub/Sub topic.
- *
- * @throws IOException if there is a problem deleting the Pub/Sub topic
- */
- private void deletePubsubTopic(String topic) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) {
- pubsubClient.projects().topics().delete(topic).execute();
- }
- }
-
- /**
- * Deletes the Google Cloud Pub/Sub subscription.
- *
- * @throws IOException if there is a problem deleting the Pub/Sub subscription
- */
- private void deletePubsubSubscription(String subscription) throws IOException {
- if (pubsubClient == null) {
- pubsubClient = newPubsubClient(options.as(PubsubOptions.class)).build();
- }
- if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) {
- pubsubClient.projects().subscriptions().delete(subscription).execute();
- }
- }
-
- /**
- * Waits for the pipeline to finish and cancels it before the program exists.
- */
- public void waitToFinish(PipelineResult result) {
- pipelinesToCancel.add(result);
- if (!options.as(ExampleOptions.class).getKeepJobsRunning()) {
- addShutdownHook(pipelinesToCancel);
- }
- try {
- result.waitUntilFinish();
- } catch (UnsupportedOperationException e) {
- // Do nothing if the given PipelineResult doesn't support waitUntilFinish(),
- // such as EvaluationResults returned by DirectRunner.
- tearDown();
- printPendingMessages();
- } catch (Exception e) {
- throw new RuntimeException("Failed to wait the pipeline until finish: " + result);
- }
- }
-
- private void addShutdownHook(final Collection pipelineResults) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- tearDown();
- printPendingMessages();
- for (PipelineResult pipelineResult : pipelineResults) {
- try {
- pipelineResult.cancel();
- } catch (IOException e) {
- System.out.println("Failed to cancel the job.");
- System.out.println(e.getMessage());
- }
- }
-
- for (PipelineResult pipelineResult : pipelineResults) {
- boolean cancellationVerified = false;
- for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) {
- if (pipelineResult.getState().isTerminal()) {
- cancellationVerified = true;
- break;
- } else {
- System.out.println(
- "The example pipeline is still running. Verifying the cancellation.");
- }
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
- }
- if (!cancellationVerified) {
- System.out.println("Failed to verify the cancellation for job: " + pipelineResult);
- }
- }
- }
- });
- }
-
- private void printPendingMessages() {
- System.out.println();
- System.out.println("***********************************************************");
- System.out.println("***********************************************************");
- for (String message : pendingMessages) {
- System.out.println(message);
- }
- System.out.println("***********************************************************");
- System.out.println("***********************************************************");
- }
-
- private static T executeNullIfNotFound(
- AbstractGoogleClientRequest request) throws IOException {
- try {
- return request.execute();
- } catch (GoogleJsonResponseException e) {
- if (e.getStatusCode() == SC_NOT_FOUND) {
- return null;
- } else {
- throw e;
- }
- }
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/WriteOneFilePerWindow.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/WriteOneFilePerWindow.java
deleted file mode 100644
index c7296162b6..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/WriteOneFilePerWindow.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.common;
-
-import static com.google.common.base.MoreObjects.firstNonNull;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-/**
- * A {@link DoFn} that writes elements to files with names deterministically derived from the lower
- * and upper bounds of their key (an {@link IntervalWindow}).
- *
- *
This is test utility code, not for end-users, so examples can be focused on their primary
- * lessons.
- */
-public class WriteOneFilePerWindow extends PTransform, PDone> {
- private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
- private String filenamePrefix;
- @Nullable
- private Integer numShards;
-
- public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
- this.filenamePrefix = filenamePrefix;
- this.numShards = numShards;
- }
-
- @Override
- public PDone expand(PCollection input) {
- ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
- TextIO.Write write =
- TextIO.write()
- .to(new PerWindowFiles(resource))
- .withTempDirectory(resource.getCurrentDirectory())
- .withWindowedWrites();
- if (numShards != null) {
- write = write.withNumShards(numShards);
- }
- return input.apply(write);
- }
-
- /**
- * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
- * being written. This always includes the shard number and the total number of shards. For
- * windowed writes, it also includes the window and pane index (a sequence number assigned to each
- * trigger firing).
- */
- public static class PerWindowFiles extends FilenamePolicy {
-
- private final ResourceId baseFilename;
-
- public PerWindowFiles(ResourceId baseFilename) {
- this.baseFilename = baseFilename;
- }
-
- public String filenamePrefixForWindow(IntervalWindow window) {
- String prefix =
- baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
- return String.format("%s-%s-%s",
- prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
- }
-
- @Override
- public ResourceId windowedFilename(int shardNumber,
- int numShards,
- BoundedWindow window,
- PaneInfo paneInfo,
- OutputFileHints outputFileHints) {
- IntervalWindow intervalWindow = (IntervalWindow) window;
- String filename =
- String.format(
- "%s-%s-of-%s%s",
- filenamePrefixForWindow(intervalWindow),
- shardNumber,
- numShards,
- outputFileHints.getSuggestedFilenameSuffix());
- return baseFilename
- .getCurrentDirectory()
- .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
- }
-
- @Override
- public ResourceId unwindowedFilename(
- int shardNumber, int numShards, OutputFileHints outputFileHints) {
- throw new UnsupportedOperationException("Unsupported.");
- }
- }
-}
diff --git a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/complete/game/GameStats.java b/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/complete/game/GameStats.java
deleted file mode 100644
index a286811293..0000000000
--- a/maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/complete/game/GameStats.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ${package}.complete.game;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
-import ${package}.common.ExampleUtils;
-import ${package}.complete.game.utils.WriteWindowedToBigQuery;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.Mean;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is the fourth in a series of four pipelines that tell a story in a 'gaming'
- * domain, following {@link UserScore}, {@link HourlyTeamScore}, and {@link LeaderBoard}.
- * New concepts: session windows and finding session duration; use of both
- * singleton and non-singleton side inputs.
- *
- *
This pipeline builds on the {@link LeaderBoard} functionality, and adds some "business
- * intelligence" analysis: abuse detection and usage patterns. The pipeline derives the Mean user
- * score sum for a window, and uses that information to identify likely spammers/robots. (The robots
- * have a higher click rate than the human users). The 'robot' users are then filtered out when
- * calculating the team scores.
- *
- *
Additionally, user sessions are tracked: that is, we find bursts of user activity using
- * session windows. Then, the mean session duration information is recorded in the context of
- * subsequent fixed windowing. (This could be used to tell us what games are giving us greater
- * user retention).
- *
- *
Run {@code org.apache.beam.examples.complete.game.injector.Injector} to generate
- * pubsub data for this pipeline. The {@code Injector} documentation provides more detail.
- *
- *
To execute this pipeline, specify the pipeline configuration like this:
- *
The BigQuery dataset you specify must already exist. The PubSub topic you specify should
- * be the same topic to which the Injector is publishing.
- */
-public class GameStats extends LeaderBoard {
-
- private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
-
- private static DateTimeFormatter fmt =
- DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
- .withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
-
- /**
- * Filter out all but those users with a high clickrate, which we will consider as 'spammy' uesrs.
- * We do this by finding the mean total score per user, then using that information as a side
- * input to filter out all but those user scores that are larger than
- * {@code (mean * SCORE_WEIGHT)}.
- */
- // [START DocInclude_AbuseDetect]
- public static class CalculateSpammyUsers
- extends PTransform>, PCollection>> {
- private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
- private static final double SCORE_WEIGHT = 2.5;
-
- @Override
- public PCollection> expand(PCollection> userScores) {
-
- // Get the sum of scores for each user.
- PCollection> sumScores = userScores
- .apply("UserSum", Sum.integersPerKey());
-
- // Extract the score from each element, and use it to find the global mean.
- final PCollectionView globalMeanScore = sumScores.apply(Values.create())
- .apply(Mean.globally().asSingletonView());
-
- // Filter the user sums using the global mean.
- PCollection> filtered = sumScores
- .apply("ProcessAndFilter", ParDo
- // use the derived mean total score as a side input
- .of(new DoFn, KV>() {
- private final Counter numSpammerUsers = Metrics.counter("main", "SpammerUsers");
- @ProcessElement
- public void processElement(ProcessContext c) {
- Integer score = c.element().getValue();
- Double gmc = c.sideInput(globalMeanScore);
- if (score > (gmc * SCORE_WEIGHT)) {
- LOG.info("user " + c.element().getKey() + " spammer score " + score
- + " with mean " + gmc);
- numSpammerUsers.inc();
- c.output(c.element());
- }
- }
- }).withSideInputs(globalMeanScore));
- return filtered;
- }
- }
- // [END DocInclude_AbuseDetect]
-
- /**
- * Calculate and output an element's session duration.
- */
- private static class UserSessionInfoFn extends DoFn, Integer> {
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- IntervalWindow w = (IntervalWindow) window;
- int duration = new Duration(
- w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();
- c.output(duration);
- }
- }
-
-
- /**
- * Options supported by {@link GameStats}.
- */
- interface Options extends LeaderBoard.Options {
- @Description("Numeric value of fixed window duration for user analysis, in minutes")
- @Default.Integer(60)
- Integer getFixedWindowDuration();
- void setFixedWindowDuration(Integer value);
-
- @Description("Numeric value of gap between user sessions, in minutes")
- @Default.Integer(5)
- Integer getSessionGap();
- void setSessionGap(Integer value);
-
- @Description("Numeric value of fixed window for finding mean of user session duration, "
- + "in minutes")
- @Default.Integer(30)
- Integer getUserActivityWindowDuration();
- void setUserActivityWindowDuration(Integer value);
-
- @Description("Prefix used for the BigQuery table names")
- @Default.String("game_stats")
- String getGameStatsTablePrefix();
- void setGameStatsTablePrefix(String value);
- }
-
-
- /**
- * Create a map of information that describes how to write pipeline output to BigQuery. This map
- * is used to write information about team score sums.
- */
- protected static Map>>
- configureWindowedWrite() {
- Map>> tableConfigure =
- new HashMap>>();
- tableConfigure.put(
- "team",
- new WriteWindowedToBigQuery.FieldInfo>(
- "STRING", (c, w) -> c.element().getKey()));
- tableConfigure.put(
- "total_score",
- new WriteWindowedToBigQuery.FieldInfo>(
- "INTEGER", (c, w) -> c.element().getValue()));
- tableConfigure.put(
- "window_start",
- new WriteWindowedToBigQuery.FieldInfo>(
- "STRING",
- (c, w) -> {
- IntervalWindow window = (IntervalWindow) w;
- return fmt.print(window.start());
- }));
- tableConfigure.put(
- "processing_time",
- new WriteWindowedToBigQuery.FieldInfo>(
- "STRING", (c, w) -> fmt.print(Instant.now())));
- return tableConfigure;
- }
-
- /**
- * Create a map of information that describes how to write pipeline output to BigQuery. This map
- * is used to write information about mean user session time.
- */
- protected static Map>
- configureSessionWindowWrite() {
-
- Map> tableConfigure =
- new HashMap>();
- tableConfigure.put(
- "window_start",
- new WriteWindowedToBigQuery.FieldInfo(
- "STRING",
- (c, w) -> {
- IntervalWindow window = (IntervalWindow) w;
- return fmt.print(window.start());
- }));
- tableConfigure.put(
- "mean_duration",
- new WriteWindowedToBigQuery.FieldInfo("FLOAT", (c, w) -> c.element()));
- return tableConfigure;
- }
-
-
-
- public static void main(String[] args) throws Exception {
-
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- // Enforce that this pipeline is always run in streaming mode.
- options.setStreaming(true);
- ExampleUtils exampleUtils = new ExampleUtils(options);
- Pipeline pipeline = Pipeline.create(options);
-
- // Read Events from Pub/Sub using custom timestamps
- PCollection rawEvents = pipeline
- .apply(PubsubIO.readStrings()
- .withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
- .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
-
- // Extract username/score pairs from the event stream
- PCollection> userEvents =
- rawEvents.apply("ExtractUserScore",
- MapElements
- .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
- .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
-
- // Calculate the total score per user over fixed windows, and
- // cumulative updates for late data.
- final PCollectionView