Contents
- 1 はじめに
- 2 環境とテストデータ
- 3 評価
- 3.1 テスト①pandas.read_csvをforでループしてpandas.concatで結合
- 3.2 テスト②pandas.read_csvをmapで実行してpandas.concatで結合
- 3.3 テスト③pandas.read_csvをリスト内包表記で実行してpandas.concatで結合
- 3.4 テスト④マルチプロセスでpandas.read_csvをmapで実行してpandas.concatで結合
- 3.5 テスト⑤マルチプロセスでpandas.read_csvを内包表記で実行してpandas.concatで結合
- 3.6 テスト⑥マルチプロセスでpandas.read_csvをmapで実行してnumpy.vstackで結合
- 3.7 テスト⑦マルチプロセスでdd.read_csvをmapで実行してnumpy.vstackで結合(参考)
- 3.8 テスト⑧io.StringIOをmapで実行してpd.read_csvで読み込み(参考)
- 4 まとめ
はじめに
Pythonで大量のcsvファイルを読み込む方法について、読み込み速度の比較検討した備忘録です。
CSVファイル単体での読み込み方法は前回の記事をご覧ください。
先に結論から言うと、確認した範囲で大量のcsvファイルを読み込む高速に読み込む方法は、
シングルプロセスで実行する場合はテスト⑤の「pandas.read_csvをリスト内包表記で実行してpandas.concatで結合」する方法、
CPU使用率が大幅に上がっても良い場合、マルチプロセスで実行する方法のテスト⑥「pandas.read_csvをmapで実行してnumpy.vstackで結合」する方法となりました。
環境とテストデータ
・Windows 10 Home 64bit
- CPU Intel Core i5-8250U CPU@ 1.60GHz
- メモリ 8GB
- SSD 512GB
- python 3.6.5
テストデータは100行100列の乱数(0~1)でcsvファイルを1000個作成します。
テストデータ作成のコード
#テストデータ作成のコード
import numpy as np
import pandas as pd
#乱数作成
for i in range(1000):
testdata = np.random.rand(100,100) # 0〜1の乱数で 100x100 の行列を生成
df = pd.DataFrame(testdata) #dataframeに変換
filename = 'dammydata' + str(i).zfill(5) + '.csv'
df.to_csv(filename , index=False) #csvに保存
↓このようなcsvデータが1000個作成されました。これをテストデータとします。
1個のcsvファイルサイズは188 KB (193,079 バイト)です。
python実行の作業ディレクトリにcsvファイル1000個作成されます。予め新規フォルダ作成して作業ディレクトリを変更してから実行しましょう。
評価
csv読み込みコード
テスト①pandas.read_csvをforでループしてpandas.concatで結合
単体のcsvファイルの場合、numpy.loadtxtよりもpandas.read_csvの方が高速に読み込むことができます。
そこでまずは、単純にfor文を使ってpandas.read_csvを全csvファイル読み込む方法です。
#pandas.read_csv for_loop
import time
import glob
import numpy as np
import pandas as pd
#for_loop
def readcsv_for_loop(fileslist):
for i,file in enumerate(fileslist):
df_tmp = pd.read_csv(file)
if i == 0:
df = df_tmp
else:
df = pd.concat([df, df_tmp])
return df
if __name__ == "__main__":
allfiles = sorted(glob.glob('*.csv', recursive=True))
start = time.time()
df = readcsv_for_loop(allfiles)
process_time = time.time() - start
print('csv読み込み時間:{:.3f}s'.format(process_time))
実行結果
csv読み込み時間:46.762s
全csvファイルを読み込むのに46.762秒かかりました。この結果をベンチマークとします。
for文の処理速度は遅いので、次はforの代わりにmapを使用します。
テスト②pandas.read_csvをmapで実行してpandas.concatで結合
for文を使わずにmapにて高速化を図る。
#pandas.read_csv map
import time
import glob
import numpy as np
import pandas as pd
#map(pd.concat)
def readcsv_map(fileslist):
df = pd.concat(map(pdreadcsv, fileslist))
return df
#csv1個読み込み(map関数用)
def pdreadcsv(csv_path):
return pd.read_csv(csv_path)
if __name__ == "__main__":
allfiles = sorted(glob.glob('*.csv', recursive=True))
start = time.time()
df = readcsv_map(allfiles)
process_time = time.time() - start
print('csv読み込み時間:{:.3f}s'.format(process_time))
実行結果
csv読み込み時間:15.187s
15.187sでした。テスト①のfor文よりもmapを使う方が3倍ほど高速に読み込みできました。
テスト③pandas.read_csvをリスト内包表記で実行してpandas.concatで結合
mapの代わりにリスト内包表記を使ってみます。
#pandas.read_csv list comprehension
import time
import glob
import numpy as np
import pandas as pd
#map(pd.concat)
def readcsv_map(fileslist):
df = pd.concat([pd.read_csv(file) for file in fileslist])
return df
if __name__ == "__main__":
allfiles = sorted(glob.glob('*.csv', recursive=True))
start = time.time()
df = readcsv_map(allfiles)
process_time = time.time() - start
print('csv読み込み時間:{:.3f}s'.format(process_time))
実行結果
csv読み込み時間:14.982s
14.982sでした。テスト②とほぼ変わりません。誤差の範囲だと思います。
コードはテスト②よりすっきりしました。
テスト④マルチプロセスでpandas.read_csvをmapで実行してpandas.concatで結合
テスト②のmapを並列処置(マルチプロセス)で実行します。
#pandas.read_csv map multiprocessing
import time
import glob
import numpy as np
import pandas as pd
import os
from multiprocessing import Pool
#map_multiprocessing(pd.concat)
def readcsv_map_multi(fileslist):
p = Pool(os.cpu_count())
df = pd.concat(p.map(pdreadcsv, fileslist))
p.close()
return df
#csv1個読み込み(map関数用)
def pdreadcsv(csv_path):
return pd.read_csv(csv_path)
if __name__ == "__main__":
allfiles = sorted(glob.glob('*.csv', recursive=True))
start = time.time()
df = readcsv_map_multi(allfiles)
process_time = time.time() - start
print('csv読み込み時間:{:.3f}s'.format(process_time))
実行結果
csv読み込み時間:5.630s
5.630sでした。並列処理によりテスト②より倍以上高速になりました。
並列処理なので当たり前ですがCPU率が大幅に上がります(100%近くなりました)。CPU使用率を気にする場合はテスト③の方法が無難です。
テスト⑤マルチプロセスでpandas.read_csvを内包表記で実行してpandas.concatで結合
テスト③の内包表記を並列処理(マルチプロセス)で実行します。
並列処理にはJoblibライブラリを使用します。
pipにてインストールしておきます。
pip install joblib
#pandas.read_csv list Joblib
import time
import glob
import numpy as np
import pandas as pd
import os
from joblib import Parallel, delayed
def readcsv_list_multi(fileslist):
df = pd.concat(Parallel(n_jobs=-1)( [delayed(pd.read_csv)(file) for file in fileslist] ))
return df
if __name__ == "__main__":
allfiles = sorted(glob.glob('*.csv', recursive=True))
start = time.time()
df = readcsv_list_multi(allfiles)
process_time = time.time() - start
print('csv読み込み時間:{:.3f}s'.format(process_time))
実行結果
csv読み込み時間:5.573s
5.573sでした。テスト④のmap並列処理とほぼ変わりません。これも誤差の範囲だと思います。
テスト⑤はテスト④に比べて新規にJoblibライブラリのインストールが必要なので少し面倒です。
そのため、以降はmapでの並列処理について検討していきます。
テスト⑥マルチプロセスでpandas.read_csvをmapで実行してnumpy.vstackで結合
データの結合はpnadas.concatよりもnumpy.vstackの方が高速に処理できます。
#readcsv_pandas_np.vstack map multi
import time
import glob
import numpy as np
import pandas as pd
import os
from multiprocessing import Pool
#map_multiprocessing(np.vstack)
def readcsv_map_multi_npvstack(fileslist):
p = Pool(os.cpu_count())
comb_np_array = np.vstack(p.map(pdreadcsv_np_array, fileslist))
df = pd.DataFrame(comb_np_array)
p.close()
return df
def pdreadcsv_np_array(csv_path):
df = pd.read_csv(csv_path)
np_array = df.values
return np_array
if __name__ == "__main__":
allfiles = sorted(glob.glob('*.csv', recursive=True))
start = time.time()
df = readcsv_map_multi_npvstack(allfiles)
process_time = time.time() - start
print('csv読み込み時間:{:.3f}s'.format(process_time))
実行結果
csv読み込み時間:5.263s
5.263sでした。テスト④よりすこし高速になりました。
ただし注意点として、一旦dataframeからnumpyに変換しているのでカラム名が消失しています。
カラム名が必要な場合は別途カラム名を再割り付けすつ処理を忘れずに。
テスト⑦マルチプロセスでdd.read_csvをmapで実行してnumpy.vstackで結合(参考)
参考にCSVファイルの高速読み込みが可能なdaskライブラリを使った方法も確認します。
#readcsv_dask_map_multi
import time
import glob
import numpy as np
import pandas as pd
import os
from multiprocessing import Pool
import dask.dataframe as dd
#readcsv_dask_map_multi
def readcsv_dask_map_multi(fileslist):
p = Pool(os.cpu_count())
df = pd.concat(p.map(ddreadcsv, fileslist))
p.close()
return df
# 並列化読み込み関数 dd
def ddreadcsv(csv_path):
return dd.read_csv(csv_path).compute()
if __name__ == "__main__":
allfiles = sorted(glob.glob('*.csv', recursive=True))
start = time.time()
df = readcsv_dask_map_multi(allfiles)
process_time = time.time() - start
print('csv読み込み時間:{:.3f}s'.format(process_time))
実行結果
csv読み込み時間:27.490s
27.490sでした。テスト⑥より5倍以上遅くなりました。
daskでのcsv読み込みはサイズの大きいcsvには有効ですが、今回のようにサイズの小さなcsvを複数読み込む場合にはオーバーヘッドが大きく不利になるようです。
テスト⑧io.StringIOをmapで実行してpd.read_csvで読み込み(参考)
参考にopenとio.StringIOを使った方法も確認します。
#io.StringIO(map)
import time
import glob
import numpy as np
import pandas as pd
import io
def read_csv_files_iostring(fileslist):
headerskiplist = [False] + [True]*(len(fileslist)-1)
alldata = ''.join(map(csvopen, fileslist, headerskiplist))
df = pd.read_csv(io.StringIO(alldata))
return df
def csvopen(file,headerskip=True):
with open(file) as f:
if headerskip: next(f) #1行目(headerを)Skip
data_temp = f.read()
return data_temp
if __name__ == "__main__":
allfiles = sorted(glob.glob('*.csv', recursive=True))
start = time.time()
df = read_csv_files_iostring(allfiles)
process_time = time.time() - start
print('csv読み込み時間:{:.3f}s'.format(process_time))
実行結果
csv読み込み時間:12.671s
12.671sでした。テスト③より15%ほど早くなりました。
シングルプロセスの場合はこれが一番早いかもしれません。
まとめ
大量のcsvファイルを高速に読み込む方法を検討しました。
今回紹介した方法が必ずしもベストではなく、csvファイルのサイズとファイル数によって読み込み速度は異なってきます。
単純なfor文は想定通り遅く、mapやリスト内包表記を使用することで速度アップができました。
また並列処理することでより高速に読み込むことができました。ただし並列処理はCPU使用率が大幅UPするデメリットもあるので注意が必要です。