值得一提的是这里 射线 同样, 它是一个分布式计算框架,它以分布式方式为pandas提供了自己的实现。
只需替换pandas导入,代码应该按原样运行:
# import pandas as pd import ray.dataframe as pd #use pd as usual
可以在这里阅读更多细节:
https://rise.cs.berkeley.edu/blog/pandas-on-ray/
如果您的数据集在1到20GB之间,那么您应该得到一个具有48GB RAM的工作站。然后Pandas可以将整个数据集保存在RAM中。我知道这不是你在这里寻找的答案,但在4GB内存的笔记本电脑上进行科学计算是不合理的。
我通常以这种方式使用数十亿字节的数据 例如我在磁盘上有表,我通过查询,创建数据和追加回读。
值得一读 文档 和 在这个帖子的后期 有关如何存储数据的几点建议。
详细信息将影响您存储数据的方式,例如: 尽可能多地提供细节;我可以帮你建立一个结构。
确保你有 大熊猫至少 0.10.1 安装。
0.10.1
读 迭代文件chunk-by-chunk 和 多个表查询 。
由于pytables被优化为按行进行操作(这是您查询的内容),因此我们将为每组字段创建一个表。通过这种方式,可以轻松选择一小组字段(可以使用大表格,但这样做效率更高......我想我将来可以修复这个限制...这是无论如何更直观): (以下是伪代码。)
import numpy as np import pandas as pd # create a store store = pd.HDFStore('mystore.h5') # this is the key to your storage: # this maps your fields to a specific group, and defines # what you want to have as data_columns. # you might want to create a nice class wrapping this # (as you will want to have this map and its inversion) group_map = dict( A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']), B = dict(fields = ['field_10',...... ], dc = ['field_10']), ..... REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []), ) group_map_inverted = dict() for g, v in group_map.items(): group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
读取文件并创建存储(基本上做什么 append_to_multiple 一样):
append_to_multiple
for f in files: # read in the file, additional options hmay be necessary here # the chunksize is not strictly necessary, you may be able to slurp each # file into memory in which case just eliminate this part of the loop # (you can also change chunksize if necessary) for chunk in pd.read_table(f, chunksize=50000): # we are going to append to each table by group # we are not going to create indexes at this time # but we *ARE* going to create (some) data_columns # figure out the field groupings for g, v in group_map.items(): # create the frame for this group frame = chunk.reindex(columns = v['fields'], copy = False) # append it store.append(g, frame, index=False, data_columns = v['dc'])
现在你已经拥有了文件中的所有表格(实际上你可以将它们存储在单独的文件中,如果你愿意,你可能需要将文件名添加到group_map,但可能这不是必需的)。
这是您获取列并创建新列的方法:
frame = store.select(group_that_I_want) # you can optionally specify: # columns = a list of the columns IN THAT GROUP (if you wanted to # select only say 3 out of the 20 columns in this sub-table) # and a where clause if you want a subset of the rows # do calculations on this frame new_frame = cool_function_on_frame(frame) # to 'add columns', create a new group (you probably want to # limit the columns in this new_group to be only NEW ones # (e.g. so you don't overlap from the other tables) # add this info to the group_map store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
当您准备进行post_processing时:
# This may be a bit tricky; and depends what you are actually doing. # I may need to modify this function to be a bit more general: report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
关于data_columns,您实际上不需要定义 的 任何 强> data_columns;它们允许您根据列子选择行。例如。就像是:
store.select(group, where = ['field_1000=foo', 'field_1001>0'])
在最终报告生成阶段,它们可能对您最感兴趣(实质上,数据列与其他列隔离,如果您定义了很多,这可能会影响效率)。
您可能还想:
如果您有疑问,请告诉我!
我知道这是一个老线程,但我认为 火焰 图书馆值得一试。它是为这些类型的情况而构建的。
的 来自文档: 强>
Blaze将NumPy和Pandas的可用性扩展到分布式和核心外计算。 Blaze提供类似于NumPy ND-Array或Pandas DataFrame的界面,但将这些熟悉的界面映射到Postgres或Spark等各种其他计算引擎上。
的 编辑: 强> 顺便说一下,它由ContinuumIO和NumPy的作者Travis Oliphant支持。
我发现一个有用的技巧 的 大数据 强> 用例是通过将浮点精度降低到32位来减少数据量。它并不适用于所有情况,但在许多应用中,64位精度是过度的,节省2倍的内存是值得的。使一个明显的观点更加明显:
>>> df = pd.DataFrame(np.random.randn(int(1e8), 5)) >>> df.info() <class 'pandas.core.frame.DataFrame'> RangeIndex: 100000000 entries, 0 to 99999999 Data columns (total 5 columns): ... dtypes: float64(5) memory usage: 3.7 GB >>> df.astype(np.float32).info() <class 'pandas.core.frame.DataFrame'> RangeIndex: 100000000 entries, 0 to 99999999 Data columns (total 5 columns): ... dtypes: float32(5) memory usage: 1.9 GB
考虑 Ruffus 如果你去创建一个分解成多个较小文件的数据管道的简单路径。
现在,在问题发生两年后,一个“核心外”的熊猫相当于: DASK 。太棒了!虽然它不支持所有的熊猫功能,但你可以使用它。
还有一个变种
在pandas中完成的许多操作也可以作为db查询完成(sql,mongo)
使用RDBMS或mongodb可以在DB Query中执行一些聚合(针对大数据进行优化,并有效地使用缓存和索引)
之后,您可以使用pandas执行后期处理。
这种方法的优点是你可以获得数据库优化来处理大数据,同时仍然用高级声明性语法定义逻辑 - 而不必处理决定在内存中做什么和做什么的细节核心。
虽然查询语言和pandas不同,但将部分逻辑从一个转换为另一个通常并不复杂。
我发现这有点晚了,但我处理类似的问题(按揭预付款模式)。我的解决方案是跳过pandas HDFStore层并使用直接pytables。我将每列保存为最终文件中的单个HDF5阵列。
我的基本工作流程是首先从数据库中获取CSV文件。我gzip它,所以它不是那么大。然后我将其转换为面向行的HDF5文件,通过在python中迭代它,将每一行转换为实际数据类型,并将其写入HDF5文件。这需要几十分钟,但它不使用任何内存,因为它只是逐行操作。然后我将面向行的HDF5文件“转置”为面向列的HDF5文件。
表转置如下:
def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"): # Get a reference to the input data. tb = h_in.getNode(table_path) # Create the output group to hold the columns. grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1)) for col_name in tb.colnames: logger.debug("Processing %s", col_name) # Get the data. col_data = tb.col(col_name) # Create the output array. arr = h_out.createCArray(grp, col_name, tables.Atom.from_dtype(col_data.dtype), col_data.shape) # Store the data. arr[:] = col_data h_out.flush()
读回来然后看起来像:
def read_hdf5(hdf5_path, group_path="/data", columns=None): """Read a transposed data set from a HDF5 file.""" if isinstance(hdf5_path, tables.file.File): hf = hdf5_path else: hf = tables.openFile(hdf5_path) grp = hf.getNode(group_path) if columns is None: data = [(child.name, child[:]) for child in grp] else: data = [(child.name, child[:]) for child in grp if child.name in columns] # Convert any float32 columns to float64 for processing. for i in range(len(data)): name, vec = data[i] if vec.dtype == np.float32: data[i] = (name, vec.astype(np.float64)) if not isinstance(hdf5_path, tables.file.File): hf.close() return pd.DataFrame.from_items(data)
现在,我通常在拥有大量内存的机器上运行它,所以我可能对内存使用情况不够谨慎。例如,默认情况下,加载操作会读取整个数据集。
这通常适合我,但它有点笨重,我不能使用花哨的pytables魔术。
编辑:这种方法的真正优势在于记录数组的pytables默认值,然后我可以使用h5r将数据加载到R中,而h5r无法处理表。或者,至少,我无法让它加载异构表。
这是pymongo的情况。我还在python中使用sql server,sqlite,HDF,ORM(SQLAlchemy)进行原型设计。首先,pymongo是一个基于文档的数据库,所以每个人都是一个文档( dict 属性)。许多人组成一个集合,你可以有很多集合(人,股市,收入)。
dict
pd.dateframe - &gt; pymongo注意:我用的是 chunksize 在 read_csv 保持5到10k记录(如果更大,pymongo会丢弃套接字)
chunksize
read_csv
aCollection.insert((a[1].to_dict() for a in df.iterrows()))
查询:gt =大于...
pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
.find() 返回一个迭代器,所以我经常使用 ichunked 切入较小的迭代器。
.find()
ichunked
加入怎么样,因为我通常会将10个数据源粘贴在一起:
aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
然后(在我的情况下,有时我必须加入 aJoinDF 首先在它“合并”之前。)
aJoinDF
df = pandas.merge(df, aJoinDF, on=aKey, how='left')
然后,您可以通过下面的更新方法将新信息写入主集合。 (逻辑集合与物理数据源)。
collection.update({primarykey:foo},{key:change})
在较小的查找上,只是非规范化。例如,您在文档中有代码,只需添加字段代码文本并执行操作 dict 在创建文档时查找。
现在你有一个基于一个人的好数据集,你可以在每个案例中释放你的逻辑并创建更多属性。最后,你可以阅读大熊猫你的3到内存最大关键指标,并进行枢轴/聚合/数据探索。这对我有300万条记录的数字/大文/类别/代码/花车/ ...
您还可以使用MongoDB中内置的两种方法(MapReduce和聚合框架)。 有关聚合框架的更多信息,请参见此处 ,因为它似乎比MapReduce更容易,并且看起来很方便快速聚合工作。注意我不需要定义我的字段或关系,我可以添加项目到文档。在快速变化的numpy,pandas,python工具集的当前状态下,MongoDB帮助我开始工作:)
正如其他人所指出的那样,经过几年的出现,出现了“核心外”的熊猫等效物: DASK 。虽然dask不是大熊猫及其所有功能的直接替代品,但它有以下几个原因:
Dask是一个灵活的分析计算并行计算库,针对交互式计算工作负载的动态任务调度进行了优化 大数据集合包括并行数组,数据帧和列表,它们将NumPy,Pandas或Python迭代器等常见接口扩展到大于内存或分布式环境,并从笔记本电脑扩展到集群。
Dask强调以下优点: 熟悉:提供并行化的NumPy数组和Pandas DataFrame对象 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度界面。 Native:通过访问PyData堆栈,在Pure Python中启用分布式计算。 快速:以低开销,低延迟和快速数值算法所需的最小序列化运行 向上扩展:在具有1000个内核的集群上弹性运行向下扩展:在单个进程中设置并在笔记本电脑上运行的简单操作 响应:设计时考虑到交互式计算,它提供快速反馈和诊断以帮助人类
Dask强调以下优点:
并添加一个简单的代码示例:
import dask.dataframe as dd df = dd.read_csv('2015-*-*.csv') df.groupby(df.user_id).value.mean().compute()
替换一些像这样的pandas代码:
import pandas as pd df = pd.read_csv('2015-01-01.csv') df.groupby(df.user_id).value.mean()
并且,特别值得注意的是,通过concurrent.futures接口提供了一个用于提交自定义任务的通用:
from dask.distributed import Client client = Client('scheduler:port') futures = [] for fn in filenames: future = client.submit(load, fn) futures.append(future) summary = client.submit(summarize, futures) summary.result()
我最近遇到了类似的问题。我发现只是以块的形式读取数据并附加它,因为我以块的形式将它写入相同的csv。我的问题是根据另一个表中的信息添加日期列,使用某些列的值,如下所示。这可能会帮助那些被dask和hdf5困惑的人,但更熟悉像我这样的熊猫。
def addDateColumn(): """Adds time to the daily rainfall data. Reads the csv as chunks of 100k rows at a time and outputs them, appending as needed, to a single csv. Uses the column of the raster names to get the date. """ df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, chunksize=100000) #read csv file as 100k chunks '''Do some stuff''' count = 1 #for indexing item in time list for chunk in df: #for each 100k rows newtime = [] #empty list to append repeating times for different rows toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time while count <= toiterate.max(): for i in toiterate: if i ==count: newtime.append(newyears[count]) count+=1 print "Finished", str(chunknum), "chunks" chunk["time"] = newtime #create new column in dataframe based on time outname = "CHIRPS_tanz_time2.csv" #append each output to same csv, using no header chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)