go>> nusa>> 返回
项目作者: irchanbani

项目描述 :
Another Dataflow Templates
高级语言: Java
项目地址: git://github.com/irchanbani/nusa.git
创建时间: 2020-09-26T05:06:13Z
项目社区:https://github.com/irchanbani/nusa

开源协议:

下载


nusa

Another Dataflow Templates that hope to complement the DataflowExamples from Google. The idea of creating this repository because I want to learn more about Java, Apache Beam, and Dataflow templates. I will try to always update the Apache Beam Version in the pom.xml as soon as possible

Disclaimer:
A lot of code in this repo is copied from the DataflowExamples, I just modified some configuration like using Pub/Sub Subscriptions instead of using Pub/Sub Topic. :bow:

Preparation

This repository use:

If both installed you will see something like this:

  1. java -version
  2. openjdk version "1.8.0_272"
  3. OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_272-b10)
  4. OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.272-b10, mixed mode)
  1. mvn -version
  2. Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
  3. Maven home: /usr/local/Cellar/maven/3.6.3_1/libexec
  4. Java version: 1.8.0_272, vendor: AdoptOpenJDK, runtime: /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/jre
  5. Default locale: en_ID, platform encoding: UTF-8
  6. OS name: "mac os x", version: "10.15.1", arch: "x86_64", family: "mac"

Creating Template

Based on this reference:

  1. mvn compile exec:java \
  2. -Dexec.mainClass=com.irchanbani.beam.PubsubSubscriptionToAvro \
  3. -Dexec.cleanupDaemonThreads=false \
  4. -Dexec.args=" \
  5. --runner=DataflowRunner \
  6. --enableStreamingEngine \
  7. --diskSizeGb=30 \
  8. --project=[YOUR_PROJECT_ID] \
  9. --region=[YOUR_BUCKET_REGION] \
  10. --tempLocation=gs://[YOUR_BUCKET_NAME]/temp \
  11. --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
  12. --templateLocation=gs://[YOUR_BUCKET_NAME]/templates/[BEAM_VERSION]/<template-name>"

Also you need to copy the metadata file in the same folder as the template.

  1. gsutil cp metadata/Cloud_PubSub_Subscription_to_Avro_metadata gs://[YOUR_BUCKET_NAME]/templates/[BEAM_VERSION]/<template-name>

Run Dataflow

Basen on this reference:

  1. gcloud dataflow jobs run [JOB_NAME] \
  2. --gcs-location gs://[YOUR_BUCKET_NAME]/templates/[BEAM_VERSION]/<template-name> \
  3. --region [REGION_ID] \
  4. --network [NETWORK] \
  5. --subnetwork [SUBNETWORK] \
  6. --max-workers [MAX_WORKERS] \
  7. --worker-machine-type [WORKER_MACHINE_TYPE] \
  8. --disable-public-ips \
  9. --parameters \
  10. inputSubscription=projects/[PROJECT_ID]/subscriptions/[SUBSCRIPTIONS_ID],\
  11. outputDirectory=gs://[BUCKET_NAME],\
  12. outputFilenamePrefix=[PREFIX],\
  13. outputFilenameSuffix=[SUFFIX],\
  14. inputAttributeTimestamp=[PUBSUB_TIMESTAMP_ATTRIBUTE],\
  15. inputAttributeId=[PUBSUB_ID_ATTRIBUTE],\
  16. numShards=[NUM_SHARDS],\
  17. avroTempDirectory=gs://[BUCKET_NAME]/tmp/

Contribution