项目作者: gerrard-err

项目描述 :
Java Utilities for PySpark Kafka Clients
高级语言: Java
项目地址: git://github.com/gerrard-err/kafka-clients-util.git
创建时间: 2018-01-23T10:01:22Z
项目社区:https://github.com/gerrard-err/kafka-clients-util

开源协议:MIT License

下载


Java Utilities for PySpark Kafka Clients (Kerberos)

Getting Started

  1. git clone https://github.com/gerrard-err/kafka-clients-util.git

Prerequisites

Maven 3, Java 8

Installing

  1. mvn install

Running the tests

  1. mvn test

Deployment

  1. export PYSPARK_SUBMIT_ARGS='--master yarn-client
  2. --jars .ivy2/jars/kafka-clients-util-1.0.jar
  3. --driver-class-path .ivy2/jars/kafka-clients-util-1.0.jar
  4. pyspark-shell'

Example

  1. from pyspark import SparkContext, SparkConf
  2. from py4j.java_gateway import Py4JNetworkError
  3. log4jLogger = sc._jvm.org.apache.log4j
  4. log = log4jLogger.LogManager.getLogger(__name__)
  5. def create_kafka_params():
  6. sc._jvm.java.lang.System.setProperty("java.security.auth.login.config",
  7. "/usr/hdp/current/kafka-broker/conf/kafka_client_jaas.conf")
  8. kafkaParams = sc._gateway.jvm.java.util.HashMap()
  9. kafkaParams.put("bootstrap.servers", "localhost:6667");
  10. kafkaParams.put("security.protocol", "SASL_PLAINTEXT")
  11. kafkaParams.put("acks", "all");
  12. kafkaParams.put("retries", 1);
  13. kafkaParams.put("key.serializer",
  14. "org.apache.kafka.common.serialization.StringSerializer");
  15. kafkaParams.put("value.serializer",
  16. "org.apache.kafka.common.serialization.StringSerializer");
  17. return kafkaParams
  18. def send(message):
  19. try:
  20. log.info("Creating Kafka producer...")
  21. kafkaClientsHelperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
  22. .loadClass("com.github.kafka.clients.KafkaClientsPythonHelper")
  23. kafkaClientsHelper = kafkaClientsHelperClass.newInstance()
  24. kafkaClientsHelper.createProducer(create_kafka_params())
  25. log.info("Sending message...")
  26. kafkaClientsHelper.send("topic_example", sc._gateway.jvm.java.util.UUID.randomUUID().toString(), message)
  27. except Py4JJavaError as e:
  28. if 'ClassNotFoundException' in str(e.java_exception):
  29. log.error(e.java_exception)
  30. raise e
  31. except Exception:
  32. print("Exception")
  33. finally:
  34. log.info("Shutting down Kafka producer...")
  35. kafkaClientsHelper.shutdownProducer()

Authors

See also the list of contributors who participated in this project.

License

This project is licensed under the MIT License - see the LICENSE file for details