开发中常见的数据格式

发布时间: 更新时间: 总字数:5421 阅读时间:11m 作者: IP上海 分享 网址

开发中常见的数据格式对比:Json/csv/pickle/parquet/feather 等

介绍

CSVJSONParquetAvro
是否是列式
是否可压缩
是否可分拆
易于人们阅读
是否支持复杂数据结构
是否支持 schema 进化
  • JSON(JavaScript Object Notation,JavaScript 对象符号) 是一种轻量级数据交换格式,优点
    • JSON 支持层级结构,简化了在一个文档中存储关联数据和展示复杂关系的难度
    • 大多数语言提供了简化 JSON 序列化的工具库,或内建支持 JSON 序列化/反序列化
    • JSON 支持对象列表,帮助避免列表转换成关系型数据模型时的不确定性
    • JSON 是一种被 NoSQL 数据库广泛使用的文件格式,比如 MongoDB,Couchbase 和 Azure Cosmos DB
    • 目前大部分工具都内置支持 JSON
  • JSON Lines(也称 newline-delimited JSON) 格式要求,是一种方便的结构化数据存储格式,一次处理一条记录
    • UTF-8 Encoding
    • Each Line is a Valid JSON Value
    • Line Separator is ‘\n’
    • Suggested Conventions: .jsonl
      • 压缩 .jsonl.gz or .jsonl.bz2
  • CSV 文件(逗号分割不同列的值)常被使用普通文本格式的系统用作交换它们的表格数据
    • 优点
      • CSV 易于人们理解和手动编辑
      • CSV 提供了简单易懂的信息格式
      • 几乎所有现存的系统都可以处理 CSV
      • CSV 易于实现和解析
      • CSV 格式紧凑
    • 缺点:
      • CSV 只能处理扁平化的数据
      • 不支持设置列的数据类型
      • 没有标准方式表示二进制数据
      • CSV 的导入问题(不区分 NULLnull 引用
      • 对特殊字符的支持很差
      • 缺少标准
  • Excel 有最大行数 1048576 的限制
  • Apache Avro 是由 Hadoop 工作组于 2009 年发布。它是基于行的格式,并且具备高度的可分拆性
  • Apache arrow 是高性能的,用于内存计算的,列式数据存储格式,Python 库为 PyArrow
  • hdf5 设计用于快速 I/O 处理和存储,它是一个高性能的数据管理套件,可以用于存储、管理和处理大型复杂数据
  • jay Datatable 使用 .jay(二进制)格式,使得读取数据集的速度非常快
  • python pickle 模块实现二进制协议,用于序列化和反序列化 Python 对象结构

Parquet

  • Parquet 是一种专为大数据处理系统优化的一种高效、高性能的列式存储文件格式,由 Twitter 和 Cloudera 联合开发,使用包括:Apache Spark、Apache Hive、Apache Flink 和 Presto
  • 特性:
    • 列式存储格式:提高查询性能
    • 高效压缩和编码:降低存储成本,高读写性能
      • 支持的压缩方式:Snappy、Gzip 和 LZO 等
    • 支持 Schema 更新
    • 支持复杂数据类型
  • Parquet 存储格式:
    • 行组是 Parquet 文件中最大的数据单位
      • 一个 Parquet 文件可以有一个或多个行组
    • 列和页
      • 是 Parquet 中存储数据的主要结构
      • 是 Parquet 中最小的数据单位,也是压缩和编码的基本单位,编码技术包括:
        • 字典编码(dictionary encoding)
        • Run-length encoding(RLE)
        • 位打包(bit packing)
import pandas as pd
df = pd.read_csv('example.csv')
df.to_parquet('output.parquet')

Feather

  • Feather 是一种用于存储数据帧的数据格式,高速读写压缩二进制文件
    • 它最初是为了 PythonR 之间快速交互而设计的,初衷很简单,就是尽可能高效地完成数据在内存中转换的效率
pip install feather-format

# 写
feather.write_dataframe(df, 'data.feather')

# 读
df = feather.read_dataframe('data.feather')

# pandas操作方式,写
df.to_feather(path, compression, compression_level)
# -- path:文件路径
# -- compression:是否压缩以及如何压缩,支持(zstd/uncompressde/lz4)三种方式
# -- compression_level:压缩水平(lz4不支持该参数)

# 加载
df = pd.read_feather('data.feather')
df = pd.read_feather(path='data.feather', columns=["a","b","c"])

数据对比

  • 依赖
pip3 install numpy===1.26.3 pandas===1.5.3 tables===3.9.2 fastparquet===2024.5.0 tabulate===0.9.0 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

生成数据

generate_data.py ...
import random
import string

import numpy as np
import pandas as pd


# 变量定义
row_num = int(1e7)
col_num = 5
str_len = 4
str_nunique = 10 # 字符串组合数量
# 生成随机数
int_matrix = np.random.randint(0, 100, size=(row_num, col_num))
df = pd.DataFrame(int_matrix, columns=['int_%d' % i for i in range(col_num)])
float_matrix = np.random.rand(row_num, col_num)
df = pd.concat(
    (df, pd.DataFrame(float_matrix, columns=['float_%d' % i for i in range(col_num)])), axis=1)
str_list = [''.join(random.sample(string.ascii_letters, str_len)) for _ in range(str_nunique)]
for i in range(col_num):
    sr = pd.Series(str_list*(row_num//str_nunique)).sample(frac=1, random_state=i)
    df['str_%d' % i] = sr

print(df.info())

# 写入
df.to_csv('./test_csv.csv', index=False)

对比测试源码

pk.py ...
import os
import timeit
import statistics

import pandas as pd
import tables

filename = 'test_csv'
df = pd.read_csv(filename + '.csv')
print(df.sample(5))
print(df.shape)
print(df.dtypes)

compress_list=[
    # CSV + INDEX
    {
      'method_name':"CSV",'save_format':"df.to_csv(filename + '_csv.csv')",
      'read_format':"pd.read_csv(filename + '_csv.csv')",'suffix':"_csv.csv",'method_describe':"有索引的CSV文件格式"
    },
    # CSV - INDEX
    {
      'method_name':"CSV No Index",'save_format':"df.to_csv(filename + '_csv.csv', index=False)",
      'read_format':"pd.read_csv(filename + '_csv.csv')",'suffix':"_csv.csv",'method_describe':"无索引的CSV文件格式"
    },
    # CSV No Index (GZIP)
    {
      'method_name':"CSV No Index (GZIP)",'save_format':"df.to_csv(filename + '.gzip', index=False, compression='gzip')",
      'read_format':"pd.read_csv(filename + '.gzip', compression='gzip')",'suffix':".gzip",'method_describe':"gzip压缩格式的无索引CSV"
    },
    # CSV No Index (BZ2)
    {
      'method_name':"CSV No Index (BZ2)",'save_format':"df.to_csv(filename + '.bz2', index=False, compression='bz2')",
      'read_format':"pd.read_csv(filename + '.bz2', compression='bz2')",'suffix':".bz2",'method_describe':"bz2压缩格式的无索引CSV"
    },
    # CSV No Index (ZIP)
    {
      'method_name':"CSV No Index (ZIP)",'save_format':"df.to_csv(filename + '.zip', index=False, compression='zip')",
      'read_format':"pd.read_csv(filename + '.zip', compression='zip')",'suffix':".zip",'method_describe':"zip压缩格式的无索引CSV"
    },
    # CSV No Index (XZ)
    {
      'method_name':"CSV No Index (XZ)",'save_format':"df.to_csv(filename + '.xz', index=False, compression='xz')",
      'read_format':"pd.read_csv(filename + '.xz', compression='xz')",'suffix':".xz",'method_describe':"xz压缩格式的无索引CSV"
    },

    # JSON
    {
      'method_name':"JSON",'save_format':"df.to_json(filename + '.json')",
      'read_format':"pd.read_json(filename + '.json')",'suffix':".json",'method_describe':"json序列化"
    },
    # JSON (GZIP)
    {
      'method_name':"JSON(GZIP)",'save_format':"df.to_json(filename + '.json', compression='gzip')",
      'read_format':"pd.read_json(filename + '.json', compression='gzip')",'suffix':".json",'method_describe':"gzip压缩格式的json序列化"
    },
    # JSON (BZ2)
    {
      'method_name':"JSON(BZ2)",'save_format':"df.to_json(filename + '.json', compression='bz2')",
      'read_format':"pd.read_json(filename + '.json', compression='bz2')",'suffix':".json",'method_describe':"bz2压缩格式的json序列化"
    },
    # JSON (ZIP)
    {
      'method_name':"JSON(ZIP)",'save_format':"df.to_json(filename + '.json', compression='zip')",
      'read_format':"pd.read_json(filename + '.json', compression='zip')",'suffix':".json",'method_describe':"zip压缩格式的json序列化"
    },
    # JSON (XZ)
    {
      'method_name':"JSON(XZ)",'save_format':"df.to_json(filename + '.json', compression='xz')",
      'read_format':"pd.read_json(filename + '.json', compression='xz')",'suffix':".json",'method_describe':"xz压缩格式的json序列化"
    },
    # Pickle
    {
      'method_name':"Pickle",'save_format':"df.to_pickle(filename + '.pkl')",
      'read_format':"pd.read_pickle(filename + '.pkl')",'suffix':".pkl",'method_describe':"二进制序列化Pickle"
    },
    # Pickle (GZIP)
    {
      'method_name':"Pickle (GZIP)",'save_format':"df.to_pickle(filename + '.pkl', compression='gzip')",
      'read_format':"pd.read_pickle(filename + '.pkl', compression='gzip')",'suffix':".pkl",'method_describe':"gzip压缩的序列化Pickle"
    },
    # Pickle (BZ2)
    {
      'method_name':"Pickle (BZ2)",'save_format':"df.to_pickle(filename + '.pkl', compression='bz2')",
      'read_format':"pd.read_pickle(filename + '.pkl', compression='bz2')",'suffix':".pkl",'method_describe':"bz2压缩的序列化Pickle"
    },
    # Pickle (ZIP)
    {
      'method_name':"Pickle (ZIP)",'save_format':"df.to_pickle(filename + '.pkl', compression='zip')",
      'read_format':"pd.read_pickle(filename + '.pkl', compression='zip')",'suffix':".pkl",'method_describe':"zip压缩的序列化Pickle"
    },
    # Pickle (XZ)
    {
      'method_name':"Pickle (XZ)",'save_format':"df.to_pickle(filename + '.pkl', compression='xz')",
      'read_format':"pd.read_pickle(filename + '.pkl', compression='xz')",'suffix':".pkl",'method_describe':"xz压缩的序列化Pickle"
    },
    # HDF+不压缩
    {
      'method_name':"HDF+不压缩",'save_format':"df.to_hdf(filename + '.h5', key='key', mode='w',complevel=0)",
      'read_format':"pd.read_hdf(filename + '.h5', key='key', mode='r')",'suffix':".h5",'method_describe':"不压缩的HDF5格式"
    },
    # HDF+浅压缩
    {
      'method_name':"HDF+浅压缩",'save_format':"df.to_hdf(filename + '.h5', key='key', mode='w',complevel=3)",
      'read_format':"pd.read_hdf(filename + '.h5', key='key', mode='r')",'suffix':".h5",'method_describe':"3级压缩的HDF5格式"
    },
    # HDF+深压缩
    {
      'method_name':"HDF+深压缩",'save_format':"df.to_hdf(filename + '.h5', key='key', mode='w',complevel=6)",
      'read_format':"pd.read_hdf(filename + '.h5', key='key', mode='r')",'suffix':".h5",'method_describe':"6级压缩的HDF5格式"
    },
    # HDF+极限压缩
    {
      'method_name':"HDF+极限压缩",'save_format':"df.to_hdf(filename + '.h5', key='key', mode='w',complevel=9)",
      'read_format':"pd.read_hdf(filename + '.h5', key='key', mode='r')",'suffix':".h5",'method_describe':"9级压缩的HDF5格式"
    },
    # Parquet(snappy)
    {
      'method_name':"Parquet(snappy)",'save_format':"df.to_parquet(filename + '.parquet', engine='fastparquet', compression='snappy')",
      'read_format':"pd.read_parquet(filename + '.parquet')",'suffix':".parquet",'method_describe':"snappy压缩的Parquet格式"
    },
    # Parquet(gzip)
    {
      'method_name':"Parquet(gzip)",'save_format':"df.to_parquet(filename + '.parquet', engine='fastparquet', compression='gzip')",
      'read_format':"pd.read_parquet(filename + '.parquet')",'suffix':".parquet",'method_describe':"gzip压缩的Parquet格式"
    },
    # Parquet(brotli)
    {
      'method_name':"Parquet(brotli)",'save_format':"df.to_parquet(filename + '.parquet', engine='fastparquet', compression='brotli')",
      'read_format':"pd.read_parquet(filename + '.parquet')",'suffix':".parquet",'method_describe':"brotli压缩的Parquet格式"
    },
]

def average(l):
    sum = 0
    for i in l:
        sum += i
    return round(sum/len(l), 5)

def compress_performance(df_results, compress_detail):
    #---saving---
    # result_save = %timeit -n5 -r5 -o eval(compress_detail['save_format'])
    result_save = timeit.repeat(
      "eval(compress_detail['save_format'])",
      repeat=5,
      number=5,
      globals=globals())
    #---get the size of file---
    file_size = os.path.getsize(filename + compress_detail['suffix']) / 1024**2
    #---load---
    # result_read = %timeit -n5 -r5 -o eval(compress_detail['read_format'])
    result_read = timeit.repeat(
      "eval(compress_detail['read_format'])",
      repeat=5,
      number=5,
      globals=globals())
    #---save the result to the dataframe---
    # import pdb; pdb.set_trace()
    row = {
        'method_name': compress_detail['method_name'],
        'file_size': file_size,
        'write_time_mean': average(result_save),
        'write_time_std': round(statistics.stdev(result_save)),
        'read_time_mean':  average(result_read),
        'read_time_std': round(statistics.stdev(result_read)),
        'method_describe': compress_detail['method_describe'],
      }
    # return df_results.append(pd.Series(row), ignore_index = True)
    return pd.concat([df_results, pd.DataFrame.from_records([row])], ignore_index=True)

compress_results = pd.DataFrame(
  columns=[
    'method_name','file_size', 'write_time_mean', 'write_time_std',
    'read_time_mean','read_time_std','method_describe'
  ])
for compress_detail in compress_list:
    print('start check compress_method: '+compress_detail['method_name'])
    compress_results = compress_performance(compress_results, compress_detail)

df = compress_results.copy()
# df = df.apply(lambda x: x.str.replace('\n', '<br>') if x.dtype == 'object' else x)
md_table = df.to_markdown()
print(md_table)

因机器性能问题,代码调整如下:

# generate_data.py
# row_num = int(1e7)
row_num = int(1e6)

# pk.py
      repeat=3,
      number=3,

$ ls -lhart
648M ... test_csv.csv

测试结果

method_namefile_sizewrite_time_meanwrite_time_stdread_time_meanread_time_stdmethod_describe
0CSV134.22724.667813.979020有索引的 CSV 文件格式
1CSV No Index127.65724.65303.517840无索引的 CSV 文件格式
2CSV No Index (GZIP)50.837571.44515.181910gzip 压缩格式的无索引 CSV
3CSV No Index (BZ2)41.549262.0258224.87270bz2 压缩格式的无索引 CSV
4CSV No Index (ZIP)51.089348.49215.396470zip 压缩格式的无索引 CSV
5CSV No Index (XZ)45.2566475.976215.33390xz 压缩格式的无索引 CSV
6JSON235.8237.02122045.82621json 序列化
7JSON(GZIP)66.8046148.5252653.1314gzip 压缩格式的 json 序列化
8JSON(BZ2)58.065570.432587.041413bz2 压缩格式的 json 序列化
9JSON(ZIP)67.017180.82183350.09221zip 压缩格式的 json 序列化
10JSON(XZ)39.0152754.8961657.90881xz 压缩格式的 json 序列化
11Pickle96.67140.8261400.502360二进制序列化 Pickle
12Pickle (GZIP)42.1024102.53641.550850gzip 压缩的序列化 Pickle
13Pickle (BZ2)40.726538.1199014.95110bz2 压缩的序列化 Pickle
14Pickle (ZIP)42.754213.532202.168210zip 压缩的序列化 Pickle
15Pickle (XZ)39.0732170.84159.660430xz 压缩的序列化 Pickle
16HDF+不压缩104.3471.3546200.947640不压缩的 HDF5 格式
17HDF+浅压缩57.19924.123401.4727403 级压缩的 HDF5 格式
18HDF+深压缩56.92944.8675801.6272806 级压缩的 HDF5 格式
19HDF+极限压缩56.92874.7793301.611709 级压缩的 HDF5 格式
20Parquet(snappy)49.93872.8481701.518930snappy 压缩的 Parquet 格式
21Parquet(gzip)42.836217.061402.460680gzip 压缩的 Parquet 格式
22Parquet(brotli)39.5319448.884233.025250brotli 压缩的 Parquet 格式

结论

  • 纯 CSV 文件在去除索引后,文件大小、读存速度均有改善,但不多
  • JSON 序列化后的原始数据较大,压缩后的写入时间偏长(不推荐)
  • 相比于 CSV 文件,Pickle 格式的原始文件大小会大一些,但是经过压缩后,最终的文反而更小
    • XZ 算法的压缩率更高,相应的耗时略有增加
    • BZ2 + Pickle 是所有方案中压缩率较高的一种方案,但相应的存储和读取耗时也都偏高,适合极致压缩的场景
    • ZIP 虽然压缩率略低一些,但是存储耗时有显著优势
  • Parquet 格式的存储,较好的压缩率,并且在存储和读取耗时有一定的优势(推荐)

参考

  1. https://www.jianshu.com/p/80c1cb6ccc74
  2. https://www.cnblogs.com/harrylyx/p/15141986.html
  3. https://banxian-w.com/article/2023/2/4/2472.html
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数