python

【python】大量のcsvファイルを高速に読み込む方法

はじめに

Pythonで大量のcsvファイルを読み込む方法について、読み込み速度の比較検討した備忘録です。

CSVファイル単体での読み込み方法は前回の記事をご覧ください。

先に結論から言うと、確認した範囲で大量のcsvファイルを読み込む高速に読み込む方法は、

シングルプロセスで実行する場合はテスト⑤の「pandas.read_csvをリスト内包表記で実行してpandas.concatで結合」する方法、

CPU使用率が大幅に上がっても良い場合、マルチプロセスで実行する方法のテスト⑥「pandas.read_csvをmapで実行してnumpy.vstackで結合」する方法となりました。

環境とテストデータ

・Windows 10 Home 64bit

  • CPU Intel Core i5-8250U CPU@ 1.60GHz
  • メモリ 8GB
  • SSD 512GB
  • python 3.6.5

テストデータは100行100列の乱数(0~1)でcsvファイルを1000個作成します。

テストデータ作成のコード

#テストデータ作成のコード
import numpy as np 
import pandas as pd

#乱数作成
for i in range(1000):
    testdata = np.random.rand(100,100)      # 0〜1の乱数で 100x100 の行列を生成
    df = pd.DataFrame(testdata)             #dataframeに変換
    filename = 'dammydata' + str(i).zfill(5) + '.csv'
    df.to_csv(filename , index=False)       #csvに保存


↓このようなcsvデータが1000個作成されました。これをテストデータとします。

1個のcsvファイルサイズは188 KB (193,079 バイト)です。

python_csv_data
csv_files

python実行の作業ディレクトリにcsvファイル1000個作成されます。予め新規フォルダ作成して作業ディレクトリを変更してから実行しましょう。

評価

csv読み込みコード

テスト①pandas.read_csvをforでループしてpandas.concatで結合

単体のcsvファイルの場合、numpy.loadtxtよりもpandas.read_csvの方が高速に読み込むことができます。

そこでまずは、単純にfor文を使ってpandas.read_csvを全csvファイル読み込む方法です。

#pandas.read_csv for_loop
import time
import glob
import numpy as np 
import pandas as pd

#for_loop
def readcsv_for_loop(fileslist):
    for i,file in enumerate(fileslist):
        df_tmp = pd.read_csv(file)
        
        if i == 0:
            df = df_tmp
        else:
            df = pd.concat([df, df_tmp])
    return df


if __name__ == "__main__":
    
    allfiles = sorted(glob.glob('*.csv', recursive=True))
    
    start = time.time()
    df = readcsv_for_loop(allfiles)
    process_time = time.time() - start
    print('csv読み込み時間:{:.3f}s'.format(process_time))

実行結果

csv読み込み時間:46.762s

全csvファイルを読み込むのに46.762秒かかりました。この結果をベンチマークとします。

for文の処理速度は遅いので、次はforの代わりにmapを使用します。

テスト②pandas.read_csvをmapで実行してpandas.concatで結合

for文を使わずにmapにて高速化を図る。

#pandas.read_csv map
import time
import glob
import numpy as np 
import pandas as pd

#map(pd.concat)
def readcsv_map(fileslist):
    df = pd.concat(map(pdreadcsv, fileslist))
    return df

#csv1個読み込み(map関数用)
def pdreadcsv(csv_path):
    return pd.read_csv(csv_path)


if __name__ == "__main__":
    
    allfiles = sorted(glob.glob('*.csv', recursive=True))
    
    start = time.time()
    df = readcsv_map(allfiles)
    process_time = time.time() - start
    print('csv読み込み時間:{:.3f}s'.format(process_time))

実行結果

csv読み込み時間:15.187s

15.187sでした。テスト①のfor文よりもmapを使う方が3倍ほど高速に読み込みできました。

テスト③pandas.read_csvをリスト内包表記で実行してpandas.concatで結合

mapの代わりにリスト内包表記を使ってみます。

#pandas.read_csv list comprehension
import time
import glob
import numpy as np 
import pandas as pd

#map(pd.concat)
def readcsv_map(fileslist):
    df = pd.concat([pd.read_csv(file) for file in fileslist])
    return df


if __name__ == "__main__":
    
    allfiles = sorted(glob.glob('*.csv', recursive=True))
    
    start = time.time()
    df = readcsv_map(allfiles)
    process_time = time.time() - start
    print('csv読み込み時間:{:.3f}s'.format(process_time))

実行結果

csv読み込み時間:14.982s

14.982sでした。テスト②とほぼ変わりません。誤差の範囲だと思います。

コードはテスト②よりすっきりしました。

テスト④マルチプロセスでpandas.read_csvをmapで実行してpandas.concatで結合

テスト②のmapを並列処置(マルチプロセス)で実行します。

#pandas.read_csv map multiprocessing
import time
import glob
import numpy as np 
import pandas as pd
import os
from multiprocessing import Pool

#map_multiprocessing(pd.concat)
def readcsv_map_multi(fileslist):
    p = Pool(os.cpu_count())
    df = pd.concat(p.map(pdreadcsv, fileslist))
    p.close()
    return df


#csv1個読み込み(map関数用)
def pdreadcsv(csv_path):
    return pd.read_csv(csv_path)


if __name__ == "__main__":
  
    allfiles = sorted(glob.glob('*.csv', recursive=True))

    start = time.time()
    df = readcsv_map_multi(allfiles)
    process_time = time.time() - start
    print('csv読み込み時間:{:.3f}s'.format(process_time))

実行結果

csv読み込み時間:5.630s

5.630sでした。並列処理によりテスト②より倍以上高速になりました。

並列処理なので当たり前ですがCPU率が大幅に上がります(100%近くなりました)。CPU使用率を気にする場合はテスト③の方法が無難です。

テスト⑤マルチプロセスでpandas.read_csvを内包表記で実行してpandas.concatで結合

テスト③の内包表記を並列処理(マルチプロセス)で実行します。

並列処理にはJoblibライブラリを使用します。

pipにてインストールしておきます。

pip install joblib

#pandas.read_csv list Joblib
import time
import glob
import numpy as np 
import pandas as pd
import os
from joblib import Parallel, delayed

def readcsv_list_multi(fileslist):
    df = pd.concat(Parallel(n_jobs=-1)( [delayed(pd.read_csv)(file) for file in fileslist] ))
    return df

if __name__ == "__main__":
    
    allfiles = sorted(glob.glob('*.csv', recursive=True))
    
    start = time.time()
    df = readcsv_list_multi(allfiles)
    process_time = time.time() - start
    print('csv読み込み時間:{:.3f}s'.format(process_time))

実行結果

csv読み込み時間:5.573s

5.573sでした。テスト④のmap並列処理とほぼ変わりません。これも誤差の範囲だと思います。

テスト⑤はテスト④に比べて新規にJoblibライブラリのインストールが必要なので少し面倒です。

そのため、以降はmapでの並列処理について検討していきます。

テスト⑥マルチプロセスでpandas.read_csvをmapで実行してnumpy.vstackで結合

データの結合はpnadas.concatよりもnumpy.vstackの方が高速に処理できます。

#readcsv_pandas_np.vstack map multi
import time
import glob
import numpy as np 
import pandas as pd
import os
from multiprocessing import Pool

#map_multiprocessing(np.vstack)
def readcsv_map_multi_npvstack(fileslist):
    p = Pool(os.cpu_count())
    comb_np_array = np.vstack(p.map(pdreadcsv_np_array, fileslist))
    df = pd.DataFrame(comb_np_array)    
    p.close()
    
    return df

def pdreadcsv_np_array(csv_path):
    df = pd.read_csv(csv_path)
    np_array = df.values
    return np_array
  
  
if __name__ == "__main__":
    
    allfiles = sorted(glob.glob('*.csv', recursive=True))  
    
    start = time.time()
    df = readcsv_map_multi_npvstack(allfiles)
    process_time = time.time() - start
    print('csv読み込み時間:{:.3f}s'.format(process_time))

実行結果

csv読み込み時間:5.263s

5.263sでした。テスト④よりすこし高速になりました。

ただし注意点として、一旦dataframeからnumpyに変換しているのでカラム名が消失しています。

カラム名が必要な場合は別途カラム名を再割り付けすつ処理を忘れずに。

テスト⑦マルチプロセスでdd.read_csvをmapで実行してnumpy.vstackで結合(参考)

参考にCSVファイルの高速読み込みが可能なdaskライブラリを使った方法も確認します。

#readcsv_dask_map_multi
import time
import glob
import numpy as np 
import pandas as pd
import os
from multiprocessing import Pool
import dask.dataframe as dd

#readcsv_dask_map_multi
def readcsv_dask_map_multi(fileslist):
    p = Pool(os.cpu_count())
    df = pd.concat(p.map(ddreadcsv, fileslist))
    p.close()
    return df

# 並列化読み込み関数 dd
def ddreadcsv(csv_path):
    return dd.read_csv(csv_path).compute()

  
if __name__ == "__main__":
    
    allfiles = sorted(glob.glob('*.csv', recursive=True))
    
    start = time.time()
    df = readcsv_dask_map_multi(allfiles)
    process_time = time.time() - start
    print('csv読み込み時間:{:.3f}s'.format(process_time))

実行結果

csv読み込み時間:27.490s

27.490sでした。テスト⑥より5倍以上遅くなりました。

daskでのcsv読み込みはサイズの大きいcsvには有効ですが、今回のようにサイズの小さなcsvを複数読み込む場合にはオーバーヘッドが大きく不利になるようです。

テスト⑧io.StringIOをmapで実行してpd.read_csvで読み込み(参考)

参考にopenとio.StringIOを使った方法も確認します。

#io.StringIO(map)
import time
import glob
import numpy as np 
import pandas as pd
import io

def read_csv_files_iostring(fileslist):
    headerskiplist = [False] + [True]*(len(fileslist)-1)
    alldata = ''.join(map(csvopen, fileslist, headerskiplist))
    df = pd.read_csv(io.StringIO(alldata))
    return df

def csvopen(file,headerskip=True):
    with open(file) as f:
        if headerskip: next(f)  #1行目(headerを)Skip
        data_temp = f.read()
    return data_temp 


if __name__ == "__main__":
    
    allfiles = sorted(glob.glob('*.csv', recursive=True))
    
    start = time.time()
    df = read_csv_files_iostring(allfiles)
    process_time = time.time() - start
    print('csv読み込み時間:{:.3f}s'.format(process_time))

実行結果

csv読み込み時間:12.671s

12.671sでした。テスト③より15%ほど早くなりました。

シングルプロセスの場合はこれが一番早いかもしれません。

まとめ

大量のcsvファイルを高速に読み込む方法を検討しました。

今回紹介した方法が必ずしもベストではなく、csvファイルのサイズとファイル数によって読み込み速度は異なってきます。

単純なfor文は想定通り遅く、mapやリスト内包表記を使用することで速度アップができました。

また並列処理することでより高速に読み込むことができました。ただし並列処理はCPU使用率が大幅UPするデメリットもあるので注意が必要です。

-python
-, ,

Copyright© Program as Life , 2023 All Rights Reserved Powered by AFFINGER5.