开发中常见的数据格式

发布时间: 更新时间: 总字数:5421 阅读时间:11m 作者: IP上海 分享 网址

开发中常见的数据格式对比:Json/csv/pickle/parquet/feather 等

介绍

CSV JSON Parquet Avro
是否是列式
是否可压缩
是否可分拆
易于人们阅读
是否支持复杂数据结构
是否支持 schema 进化
  • JSON(JavaScript Object Notation,JavaScript 对象符号) 是一种轻量级数据交换格式,优点
    • JSON 支持层级结构,简化了在一个文档中存储关联数据和展示复杂关系的难度
    • 大多数语言提供了简化 JSON 序列化的工具库,或内建支持 JSON 序列化/反序列化
    • JSON 支持对象列表,帮助避免列表转换成关系型数据模型时的不确定性
    • JSON 是一种被 NoSQL 数据库广泛使用的文件格式,比如 MongoDB,Couchbase 和 Azure Cosmos DB
    • 目前大部分工具都内置支持 JSON
  • JSON Lines(也称 newline-delimited JSON) 格式要求,是一种方便的结构化数据存储格式,一次处理一条记录
    • UTF-8 Encoding
    • Each Line is a Valid JSON Value
    • Line Separator is ‘\n’
    • Suggested Conventions: .jsonl
      • 压缩 .jsonl.gz or .jsonl.bz2
  • CSV 文件(逗号分割不同列的值)常被使用普通文本格式的系统用作交换它们的表格数据
    • 优点
      • CSV 易于人们理解和手动编辑
      • CSV 提供了简单易懂的信息格式
      • 几乎所有现存的系统都可以处理 CSV
      • CSV 易于实现和解析
      • CSV 格式紧凑
    • 缺点:
      • CSV 只能处理扁平化的数据
      • 不支持设置列的数据类型
      • 没有标准方式表示二进制数据
      • CSV 的导入问题(不区分 NULLnull 引用
      • 对特殊字符的支持很差
      • 缺少标准
  • Excel 有最大行数 1048576 的限制
  • Apache Avro 是由 Hadoop 工作组于 2009 年发布。它是基于行的格式,并且具备高度的可分拆性
  • Apache arrow 是高性能的,用于内存计算的,列式数据存储格式,Python 库为 PyArrow
  • hdf5 设计用于快速 I/O 处理和存储,它是一个高性能的数据管理套件,可以用于存储、管理和处理大型复杂数据
  • jay Datatable 使用 .jay(二进制)格式,使得读取数据集的速度非常快
  • python pickle 模块实现二进制协议,用于序列化和反序列化 Python 对象结构

Parquet

  • Parquet 是一种专为大数据处理系统优化的一种高效、高性能的列式存储文件格式,由 Twitter 和 Cloudera 联合开发,使用包括:Apache Spark、Apache Hive、Apache Flink 和 Presto
  • 特性:
    • 列式存储格式:提高查询性能
    • 高效压缩和编码:降低存储成本,高读写性能
      • 支持的压缩方式:Snappy、Gzip 和 LZO 等
    • 支持 Schema 更新
    • 支持复杂数据类型
  • Parquet 存储格式:
    • 行组是 Parquet 文件中最大的数据单位
      • 一个 Parquet 文件可以有一个或多个行组
    • 列和页
      • 是 Parquet 中存储数据的主要结构
      • 是 Parquet 中最小的数据单位,也是压缩和编码的基本单位,编码技术包括:
        • 字典编码(dictionary encoding)
        • Run-length encoding(RLE)
        • 位打包(bit packing)
import pandas as pd
df = pd.read_csv('example.csv')
df.to_parquet('output.parquet')

Feather

  • Feather 是一种用于存储数据帧的数据格式,高速读写压缩二进制文件
    • 它最初是为了 PythonR 之间快速交互而设计的,初衷很简单,就是尽可能高效地完成数据在内存中转换的效率
pip install feather-format

# 写
feather.write_dataframe(df, 'data.feather')

# 读
df = feather.read_dataframe('data.feather')

# pandas操作方式,写
df.to_feather(path, compression, compression_level)
# -- path:文件路径
# -- compression:是否压缩以及如何压缩,支持(zstd/uncompressde/lz4)三种方式
# -- compression_level:压缩水平(lz4不支持该参数)

# 加载
df = pd.read_feather('data.feather')
df = pd.read_feather(path='data.feather', columns=["a","b","c"])

数据对比

  • 依赖
pip3 install numpy===1.26.3 pandas===1.5.3 tables===3.9.2 fastparquet===2024.5.0 tabulate===0.9.0 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

生成数据

generate_data.py ...
import random
import string

import numpy as np
import pandas as pd


# 变量定义
row_num = int(1e7)
col_num = 5
str_len = 4
str_nunique = 10 # 字符串组合数量
# 生成随机数
int_matrix = np.random.randint(0, 100, size=(row_num, col_num))
df = pd.DataFrame(int_matrix, columns=['int_%d' % i for i in range(col_num)])
float_matrix = np.random.rand(row_num, col_num)
df = pd.concat(
    (df, pd.DataFrame(float_matrix, columns=['float_%d' % i for i in range(col_num)])), axis=1)
str_list = [''.join(random.sample(string.ascii_letters, str_len)) for _ in range(str_nunique)]
for i in range(col_num):
    sr = pd.Series(str_list*(row_num//str_nunique)).sample(frac=1, random_state=i)
    df['str_%d' % i] = sr

print(df.info())

# 写入
df.to_csv('./test_csv.csv', index=False)

对比测试源码

pk.py ...
import os
import timeit
import statistics

import pandas as pd
import tables

filename = 'test_csv'
df = pd.read_csv(filename + '.csv')
print(df.sample(5))
print(df.shape)
print(df.dtypes)

compress_list=[
    # CSV + INDEX
    {
      'method_name':"CSV",'save_format':"df.to_csv(filename + '_csv.csv')",
      'read_format':"pd.read_csv(filename + '_csv.csv')",'suffix':"_csv.csv",'method_describe':"有索引的CSV文件格式"
    },
    # CSV - INDEX
    {
      'method_name':"CSV No Index",'save_format':"df.to_csv(filename + '_csv.csv', index=False)",
      'read_format':"pd.read_csv(filename + '_csv.csv')",'suffix':"_csv.csv",'method_describe':"无索引的CSV文件格式"
    },
    # CSV No Index (GZIP)
    {
      'method_name':"CSV No Index (GZIP)",'save_format':"df.to_csv(filename + '.gzip', index=False, compression='gzip')",
      'read_format':"pd.read_csv(filename + '.gzip', compression='gzip')",'suffix':".gzip",'method_describe':"gzip压缩格式的无索引CSV"
    },
    # CSV No Index (BZ2)
    {
      'method_name':"CSV No Index (BZ2)",'save_format':"df.to_csv(filename + '.bz2', index=False, compression='bz2')",
      'read_format':"pd.read_csv(filename + '.bz2', compression='bz2')",'suffix':".bz2",'method_describe':"bz2压缩格式的无索引CSV"
    },
    # CSV No Index (ZIP)
    {
      'method_name':"CSV No Index (ZIP)",'save_format':"df.to_csv(filename + '.zip', index=False, compression='zip')",
      'read_format':"pd.read_csv(filename + '.zip', compression='zip')",'suffix':".zip",'method_describe':"zip压缩格式的无索引CSV"
    },
    # CSV No Index (XZ)
    {
      'method_name':"CSV No Index (XZ)",'save_format':"df.to_csv(filename + '.xz', index=False, compression='xz')",
      'read_format':"pd.read_csv(filename + '.xz', compression='xz')",'suffix':".xz",'method_describe':"xz压缩格式的无索引CSV"
    },

    # JSON
    {
      'method_name':"JSON",'save_format':"df.to_json(filename + '.json')",
      'read_format':"pd.read_json(filename + '.json')",'suffix':".json",'method_describe':"json序列化"
    },
    # JSON (GZIP)
    {
      'method_name':"JSON(GZIP)",'save_format':"df.to_json(filename + '.json', compression='gzip')",
      'read_format':"pd.read_json(filename + '.json', compression='gzip')",'suffix':".json",'method_describe':"gzip压缩格式的json序列化"
    },
    # JSON (BZ2)
    {
      'method_name':"JSON(BZ2)",'save_format':"df.to_json(filename + '.json', compression='bz2')",
      'read_format':"pd.read_json(filename + '.json', compression='bz2')",'suffix':".json",'method_describe':"bz2压缩格式的json序列化"
    },
    # JSON (ZIP)
    {
      'method_name':"JSON(ZIP)",'save_format':"df.to_json(filename + '.json', compression='zip')",
      'read_format':"pd.read_json(filename + '.json', compression='zip')",'suffix':".json",'method_describe':"zip压缩格式的json序列化"
    },
    # JSON (XZ)
    {
      'method_name':"JSON(XZ)",'save_format':"df.to_json(filename + '.json', compression='xz')",
      'read_format':"pd.read_json(filename + '.json', compression='xz')",'suffix':".json",'method_describe':"xz压缩格式的json序列化"
    },
    # Pickle
    {
      'method_name':"Pickle",'save_format':"df.to_pickle(filename + '.pkl')",
      'read_format':"pd.read_pickle(filename + '.pkl')",'suffix':".pkl",'method_describe':"二进制序列化Pickle"
    },
    # Pickle (GZIP)
    {
      'method_name':"Pickle (GZIP)",'save_format':"df.to_pickle(filename + '.pkl', compression='gzip')",
      'read_format':"pd.read_pickle(filename + '.pkl', compression='gzip')",'suffix':".pkl",'method_describe':"gzip压缩的序列化Pickle"
    },
    # Pickle (BZ2)
    {
      'method_name':"Pickle (BZ2)",'save_format':"df.to_pickle(filename + '.pkl', compression='bz2')",
      'read_format':"pd.read_pickle(filename + '.pkl', compression='bz2')",'suffix':".pkl",'method_describe':"bz2压缩的序列化Pickle"
    },
    # Pickle (ZIP)
    {
      'method_name':"Pickle (ZIP)",'save_format':"df.to_pickle(filename + '.pkl', compression='zip')",
      'read_format':"pd.read_pickle(filename + '.pkl', compression='zip')",'suffix':".pkl",'method_describe':"zip压缩的序列化Pickle"
    },
    # Pickle (XZ)
    {
      'method_name':"Pickle (XZ)",'save_format':"df.to_pickle(filename + '.pkl', compression='xz')",
      'read_format':"pd.read_pickle(filename + '.pkl', compression='xz')",'suffix':".pkl",'method_describe':"xz压缩的序列化Pickle"
    },
    # HDF+不压缩
    {
      'method_name':"HDF+不压缩",'save_format':"df.to_hdf(filename + '.h5', key='key', mode='w',complevel=0)",
      'read_format':"pd.read_hdf(filename + '.h5', key='key', mode='r')",'suffix':".h5",'method_describe':"不压缩的HDF5格式"
    },
    # HDF+浅压缩
    {
      'method_name':"HDF+浅压缩",'save_format':"df.to_hdf(filename + '.h5', key='key', mode='w',complevel=3)",
      'read_format':"pd.read_hdf(filename + '.h5', key='key', mode='r')",'suffix':".h5",'method_describe':"3级压缩的HDF5格式"
    },
    # HDF+深压缩
    {
      'method_name':"HDF+深压缩",'save_format':"df.to_hdf(filename + '.h5', key='key', mode='w',complevel=6)",
      'read_format':"pd.read_hdf(filename + '.h5', key='key', mode='r')",'suffix':".h5",'method_describe':"6级压缩的HDF5格式"
    },
    # HDF+极限压缩
    {
      'method_name':"HDF+极限压缩",'save_format':"df.to_hdf(filename + '.h5', key='key', mode='w',complevel=9)",
      'read_format':"pd.read_hdf(filename + '.h5', key='key', mode='r')",'suffix':".h5",'method_describe':"9级压缩的HDF5格式"
    },
    # Parquet(snappy)
    {
      'method_name':"Parquet(snappy)",'save_format':"df.to_parquet(filename + '.parquet', engine='fastparquet', compression='snappy')",
      'read_format':"pd.read_parquet(filename + '.parquet')",'suffix':".parquet",'method_describe':"snappy压缩的Parquet格式"
    },
    # Parquet(gzip)
    {
      'method_name':"Parquet(gzip)",'save_format':"df.to_parquet(filename + '.parquet', engine='fastparquet', compression='gzip')",
      'read_format':"pd.read_parquet(filename + '.parquet')",'suffix':".parquet",'method_describe':"gzip压缩的Parquet格式"
    },
    # Parquet(brotli)
    {
      'method_name':"Parquet(brotli)",'save_format':"df.to_parquet(filename + '.parquet', engine='fastparquet', compression='brotli')",
      'read_format':"pd.read_parquet(filename + '.parquet')",'suffix':".parquet",'method_describe':"brotli压缩的Parquet格式"
    },
]

def average(l):
    sum = 0
    for i in l:
        sum += i
    return round(sum/len(l), 5)

def compress_performance(df_results, compress_detail):
    #---saving---
    # result_save = %timeit -n5 -r5 -o eval(compress_detail['save_format'])
    result_save = timeit.repeat(
      "eval(compress_detail['save_format'])",
      repeat=5,
      number=5,
      globals=globals())
    #---get the size of file---
    file_size = os.path.getsize(filename + compress_detail['suffix']) / 1024**2
    #---load---
    # result_read = %timeit -n5 -r5 -o eval(compress_detail['read_format'])
    result_read = timeit.repeat(
      "eval(compress_detail['read_format'])",
      repeat=5,
      number=5,
      globals=globals())
    #---save the result to the dataframe---
    # import pdb; pdb.set_trace()
    row = {
        'method_name': compress_detail['method_name'],
        'file_size': file_size,
        'write_time_mean': average(result_save),
        'write_time_std': round(statistics.stdev(result_save)),
        'read_time_mean':  average(result_read),
        'read_time_std': round(statistics.stdev(result_read)),
        'method_describe': compress_detail['method_describe'],
      }
    # return df_results.append(pd.Series(row), ignore_index = True)
    return pd.concat([df_results, pd.DataFrame.from_records([row])], ignore_index=True)

compress_results = pd.DataFrame(
  columns=[
    'method_name','file_size', 'write_time_mean', 'write_time_std',
    'read_time_mean','read_time_std','method_describe'
  ])
for compress_detail in compress_list:
    print('start check compress_method: '+compress_detail['method_name'])
    compress_results = compress_performance(compress_results, compress_detail)

df = compress_results.copy()
# df = df.apply(lambda x: x.str.replace('\n', '<br>') if x.dtype == 'object' else x)
md_table = df.to_markdown()
print(md_table)

因机器性能问题,代码调整如下:

# generate_data.py
# row_num = int(1e7)
row_num = int(1e6)

# pk.py
      repeat=3,
      number=3,

$ ls -lhart
648M ... test_csv.csv

测试结果

method_name file_size write_time_mean write_time_std read_time_mean read_time_std method_describe
0 CSV 134.227 24.6678 1 3.97902 0 有索引的 CSV 文件格式
1 CSV No Index 127.657 24.653 0 3.51784 0 无索引的 CSV 文件格式
2 CSV No Index (GZIP) 50.8375 71.445 1 5.18191 0 gzip 压缩格式的无索引 CSV
3 CSV No Index (BZ2) 41.5492 62.0258 2 24.8727 0 bz2 压缩格式的无索引 CSV
4 CSV No Index (ZIP) 51.0893 48.492 1 5.39647 0 zip 压缩格式的无索引 CSV
5 CSV No Index (XZ) 45.2566 475.976 2 15.3339 0 xz 压缩格式的无索引 CSV
6 JSON 235.823 7.02122 0 45.8262 1 json 序列化
7 JSON(GZIP) 66.8046 148.525 26 53.131 4 gzip 压缩格式的 json 序列化
8 JSON(BZ2) 58.0655 70.432 5 87.0414 13 bz2 压缩格式的 json 序列化
9 JSON(ZIP) 67.0171 80.8218 33 50.0922 1 zip 压缩格式的 json 序列化
10 JSON(XZ) 39.0152 754.896 16 57.9088 1 xz 压缩格式的 json 序列化
11 Pickle 96.6714 0.82614 0 0.50236 0 二进制序列化 Pickle
12 Pickle (GZIP) 42.1024 102.536 4 1.55085 0 gzip 压缩的序列化 Pickle
13 Pickle (BZ2) 40.7265 38.1199 0 14.9511 0 bz2 压缩的序列化 Pickle
14 Pickle (ZIP) 42.7542 13.5322 0 2.16821 0 zip 压缩的序列化 Pickle
15 Pickle (XZ) 39.0732 170.84 15 9.66043 0 xz 压缩的序列化 Pickle
16 HDF+不压缩 104.347 1.35462 0 0.94764 0 不压缩的 HDF5 格式
17 HDF+浅压缩 57.1992 4.1234 0 1.47274 0 3 级压缩的 HDF5 格式
18 HDF+深压缩 56.9294 4.86758 0 1.62728 0 6 级压缩的 HDF5 格式
19 HDF+极限压缩 56.9287 4.77933 0 1.6117 0 9 级压缩的 HDF5 格式
20 Parquet(snappy) 49.9387 2.84817 0 1.51893 0 snappy 压缩的 Parquet 格式
21 Parquet(gzip) 42.8362 17.0614 0 2.46068 0 gzip 压缩的 Parquet 格式
22 Parquet(brotli) 39.5319 448.884 23 3.02525 0 brotli 压缩的 Parquet 格式

结论

  • 纯 CSV 文件在去除索引后,文件大小、读存速度均有改善,但不多
  • JSON 序列化后的原始数据较大,压缩后的写入时间偏长(不推荐)
  • 相比于 CSV 文件,Pickle 格式的原始文件大小会大一些,但是经过压缩后,最终的文反而更小
    • XZ 算法的压缩率更高,相应的耗时略有增加
    • BZ2 + Pickle 是所有方案中压缩率较高的一种方案,但相应的存储和读取耗时也都偏高,适合极致压缩的场景
    • ZIP 虽然压缩率略低一些,但是存储耗时有显著优势
  • Parquet 格式的存储,较好的压缩率,并且在存储和读取耗时有一定的优势(推荐)

参考

  1. https://www.jianshu.com/p/80c1cb6ccc74
  2. https://www.cnblogs.com/harrylyx/p/15141986.html
  3. https://banxian-w.com/article/2023/2/4/2472.html
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数