Kiba作者在这里。谢谢你试试吧!
目前,实现这一目标的最佳方法是创建我称之为“缓冲目标”的东西。 (其中一个版本可能会在某些时候最终出现在Kiba Common)。
(请彻底测试,我今天早上为你写了这个,但是根本没有运行它,虽然我过去使用的是通用版本较少。还要记住这个版本使用内存缓冲区为你的10k行,所以将数字增加到更大的东西将消耗内存。但是也可以创建最少内存消耗的版本,这会在你获得它们时将行写入文件)
class BufferingDestination def initialize(buffer_size:, on_flush:) @buffer = [] @buffer_size @on_flush = on_flush @batch_index = 0 end def write(row) @buffer << row flush if @buffer.size >= buffer_size end def flush on_flush.call(batch_index: @batch_index, rows: @buffer) @batch_index += 1 @buffer.clear end def close flush end end
这是你可以像这样使用的东西,例如这里重复使用 Kiba Common CSV目的地 (虽然你也可以写自己的):
require 'kiba-common/destinations/csv' destination BufferingDestination, buffer_size: 10_000, on_flush: -> { |batch_index, rows| filename = File.join("output-#{sprintf("%08d", batch_index)}") csv = Kiba::Common::Destinations::CSV.new( filename: filename, csv_options: { ... }, headers: %w(my fields here) ) rows.each { |r| csv.write(r) } csv.close }
然后你可以触发你的 COPY 就在 on_flush 生成文件后阻止(如果您希望立即开始上传),或者在 post_process 阻止(但这只能在所有CSV准备就绪后开始,如果您愿意,这可以是确保某种形式的事务全局上传的功能)。
COPY
on_flush
post_process
如果你真的需要这个(但是要小心僵尸线程等),你可以想象并开始一个线程队列来实际并行地处理上传。
另一种方法是使用“多步骤”ETL过程,一个脚本生成CSV,另一个选择它们进行上传,同时运行(这是我在演讲中解释的内容) RubyKaigi 2018 例如)。
让我知道事情对你有用!
我不确定这个确切的问题。但是,我认为你的解决方案似乎总体上是正确的,但很少有建议。
gzip
menifest
copy
Thibaut,我做了类似的事情,除了我将它流式传输到Tempfile,我想......
require 'csv' # @param limit [Integer, 1_000] Number of rows per csv file # @param callback [Proc] Proc taking one argument [CSV/io], that can be used after # each csv file is finished module PacerPro class CSVDestination def initialize(limit: 1_000, callback: ->(obj) { }) @limit = limit @callback = callback @csv = nil @row_count = 0 end # @param row [Hash] returned from transforms def write(row) csv << row.values @row_count += 1 return if row_count < limit self.close end # Called by Kiba when the transform pipeline is finished def close csv.close callback.call(csv) tempfile.unlink @csv = nil @row_count = 0 end private attr_reader :limit, :callback attr_reader :row_count, :tempfile def csv @csv ||= begin @tempfile = Tempfile.new('csv') CSV.open(@tempfile, 'w') end end end end