Truststore和Google Cloud Dataflow


AsuRa
2025-03-12 08:48:59 (5天前)
  1. 我需要用一个


信任
</跨度>
存储在Google中建立SSL Kafka连接


</跨度>
数据流。我可以从存储桶中提供此信息,还是可以将其存储在“本地文件系统”中?

3 条回复
  1. 0# trpnest | 2019-08-31 10-32



    感谢@jkff的解决方案,这是一个实现示例:



    ConsumerFactoryFn实现示例:




    1. private static class ConsumerFactoryFn
      implements SerializableFunction, Consumer>
      {

    2. public Consumer<byte[], byte[]> apply(Map<String, Object> config) 
    3. {
    4.     try 
    5.     {
    6.         Storage storage = StorageOptions.newBuilder()
    7.                 .setProjectId("prj-id-of-your-bucket")
    8.                 .setCredentials(GoogleCredentials.getApplicationDefault())
    9.                 .build()
    10.                 .getService();
    11.         Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
    12.         ReadChannel readChannel = blob.reader();
    13.         FileOutputStream fileOuputStream;
    14.         fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
    15.         fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
    16.         fileOuputStream.close();
    17.         File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
    18.         if (f.exists())
    19.         {
    20.             LOG.debug("key exists");
    21.         }
    22.         else
    23.         {
    24.             LOG.error("key does not exist");
    25.         }
    26.     } catch (FileNotFoundException e) {
    27.         // TODO Auto-generated catch block
    28.         LOG.error( e.getMessage());
    29.     } catch (IOException e) {
    30.         // TODO Auto-generated catch block
    31.         LOG.error( e.getMessage());
    32.     }
    33.     config.put("ssl.truststore.location",(Object) "/tmp/kafka.client.truststore.jks" );
    34.     return new KafkaConsumer<byte[], byte[]>(config);
    35. }
    36. }

    37. </code>


    并且不要忘记在你的KafkaIO.read()调用中使用.withConsumerFactoryFn,应该是这样的:




    1. Map configMap = new HashMap();
      configMap.put(“security.protocol”, (Object) SSL”);
      configMap.put(“ssl.truststore.password”, (Object) clientpass”);

    2. p.apply(“ReadFromKafka”, KafkaIO.read()
      .withBootstrapServers(“ip:9093”)
      .withTopic(“pageviews”)
      .withKeyDeserializer(StringDeserializer.class)
      .withValueDeserializer(StringDeserializer.class)
      .updateConsumerProperties(configMap)
      .withConsumerFactoryFn(new ConsumerFactoryFn()) etc.

    3. </code>

  2. 1# 那年 | 2019-08-31 10-32



    您可以使用

    KafkaIO.Read.withConsumerFactoryFn

    提供将被调用以创建Kafka使用者的工厂函数。在该功能中,您可以随意做任何您喜欢的事情,例如:您可以从GCS存储桶下载信任存储文件(我建议使用
    <a href =“https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java"rel =” nofollow noreferrer“>
    GcsUtil

    为此,并将其保存到本地磁盘上的临时文件 - AFAIK Kafka本身只支持在本地磁盘上具有此文件。然后手动创建一个

    KafkaConsumer

    并将其指向文件。


登录 后才能参与评论