diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index 53a7684e..33e3e4bc 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -17,7 +17,7 @@ jobs: strategy: fail-fast: false matrix: - solution: [data.table, collapse, dplyr, pandas, pydatatable, spark, juliadf, juliads, polars, arrow, duckdb, duckdb-latest, datafusion] + solution: [data.table, collapse, dplyr, pandas, modin, pydatatable, spark, juliadf, juliads, polars, arrow, duckdb, duckdb-latest, datafusion] name: Regression Tests solo solutions runs-on: ubuntu-20.04 env: @@ -91,7 +91,7 @@ jobs: name: ${{ matrix.solution }}-out.zip path: ${{ matrix.solution }}-out.zip if-no-files-found: error - + regression-test-benchmark-runner-all-solutions: needs: regression-test-benchmark-runner-solo-solutions name: Regression Tests all solutions diff --git a/.gitignore b/.gitignore index 67623669..53f4251d 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ metastore_db/* *.md5 .Rproj.user .Rhistory +miniconda db-benchmark.Rproj */REVISION token diff --git a/_benchplot/benchplot-dict.R b/_benchplot/benchplot-dict.R index 6ac2df8a..6e14a8c6 100644 --- a/_benchplot/benchplot-dict.R +++ b/_benchplot/benchplot-dict.R @@ -35,6 +35,7 @@ solution.dict = {list( "data.table" = list(name=c(short="data.table", long="data.table"), color=c(strong="blue", light="#7777FF")), "dplyr" = list(name=c(short="dplyr", long="dplyr"), color=c(strong="red", light="#FF7777")), "pandas" = list(name=c(short="pandas", long="pandas"), color=c(strong="green4", light="#77FF77")), + "modin" = list(name=c(short="modin", long="modin"), color=c(strong="blue4", light="#7799ff")), "pydatatable" = list(name=c(short="pydatatable", long="(py)datatable"), color=c(strong="darkorange", light="orange")), "spark" = list(name=c(short="spark", long="spark"), color=c(strong="#8000FFFF", light="#CC66FF")), "dask" = list(name=c(short="dask", long="dask"), color=c(strong="slategrey", light="lightgrey")), @@ -115,6 +116,18 @@ groupby.syntax.dict = {list( "regression v1 v2 by id2 id4" = "DF[['id2','id4','v1','v2']].groupby(['id2','id4'], as_index=False, sort=False, observed=True, dropna=False).apply(lambda x: x['v1'].corr(x['v2'])**2).rename(columns={None: 'r2'})", "sum v3 count by id1:id6" = "DF.groupby(['id1','id2','id3','id4','id5','id6'], as_index=False, sort=False, observed=True, dropna=False).agg({'v3':'sum', 'v1':'size'})" )}, + "modin" = {c( + "sum v1 by id1" = "DF.groupby('id1', as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'sum'})", + "sum v1 by id1:id2" = "DF.groupby(['id1','id2'], as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'sum'})", + "sum v1 mean v3 by id3" = "DF.groupby('id3', as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'sum', 'v3':'mean'})", + "mean v1:v3 by id4" = "DF.groupby('id4', as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'})", + "sum v1:v3 by id6" = "DF.groupby('id6', as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'sum', 'v2':'sum', 'v3':'sum'})", + "median v3 sd v3 by id4 id5" = "DF.groupby(['id4','id5'], as_index=False, sort=False, observed=True, dropna=False).agg({'v3': ['median','std']})", + "max v1 - min v2 by id3" = "DF.groupby('id3', as_index=False, sort=False, observed=True, dropna=False).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['id3','range_v1_v2']]", + "largest two v3 by id6" = "DF.groupby('id6', sort=False, observed=True)['v3'].nlargest(2).reset_index()[['id6', 'v3']]", + "regression v1 v2 by id2 id4" = "query('SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM df GROUP BY id2, id4;', df=x)", + "sum v3 count by id1:id6" = "DF.groupby(['id1','id2','id3','id4','id5','id6'], as_index=False, sort=False, observed=True, dropna=False).agg({'v3':'sum', 'v1':'size'})" + )}, "pydatatable" = {c( "sum v1 by id1" = "DT[:, {'v1': sum(f.v1)}, by(f.id1)]", "sum v1 by id1:id2" = "DT[:, {'v1': sum(f.v1)}, by(f.id1, f.id2)]", @@ -253,6 +266,7 @@ groupby.syntax.dict = {list( "data.table" = list(), "dplyr" = list(), "pandas" = list(), + "modin" = list(), "pydatatable" = list(), "spark" = list("not yet implemented: SPARK-26589" = "median v3 sd v3 by id4 id5"), "dask" = list("not yet implemented: dask#4362" = "median v3 sd v3 by id4 id5"), @@ -281,6 +295,8 @@ groupby.data.exceptions = {list( "pandas" = {list( "out of memory" = c("G1_1e9_1e2_0_0","G1_1e9_1e1_0_0","G1_1e9_2e0_0_0","G1_1e9_1e2_0_1") # read_csv #9 )}, + "modin" = {list( + )}, "pydatatable" = {list( "csv reader NAs bug: datatable#2808" = c("G1_1e9_1e2_5_0") )}, @@ -385,6 +401,13 @@ join.syntax.dict = {list( "medium inner on factor" = "DF.merge(medium, on='id5')", "big inner on int" = "DF.merge(big, on='id3')" )}, + "modin" = {c( + "small inner on int" = "DF.merge(small, on='id1')", + "medium inner on int" = "DF.merge(medium, on='id2')", + "medium outer on int" = "DF.merge(medium, how='left', on='id2')", + "medium inner on factor" = "DF.merge(medium, on='id5')", + "big inner on int" = "DF.merge(big, on='id3')" + )}, "pydatatable" = {c( "small inner on int" = "y.key = 'id1'; DT[:, :, join(y)][isfinite(f.v2), :]", "medium inner on int" = "y.key = 'id2'; DT[:, :, join(y)][isfinite(f.v2), :]", @@ -447,6 +470,7 @@ join.query.exceptions = {list( "data.table" = list(), "dplyr" = list(), "pandas" = list(), + "modin" = list(), "pydatatable" = list(), "spark" = list(), "dask" = list(), @@ -471,6 +495,9 @@ join.data.exceptions = {list( "pandas" = {list( "out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1") # read_csv )}, + "modin" = {list( + "out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1") + )}, "pydatatable" = {list( "csv reader NAs bug: datatable#2808" = "J1_1e9_NA_5_0", "out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_0_1") # q5 out of memory due to a deep copy diff --git a/_control/solutions.csv b/_control/solutions.csv index c96f07cf..872bb3b3 100644 --- a/_control/solutions.csv +++ b/_control/solutions.csv @@ -11,6 +11,8 @@ dplyr,groupby2014 pandas,groupby pandas,join pandas,groupby2014 +modin,groupby +modin,join pydatatable,groupby pydatatable,join spark,groupby diff --git a/_utils/repro.sh b/_utils/repro.sh old mode 100644 new mode 100755 index a8df441f..2ce011c9 --- a/_utils/repro.sh +++ b/_utils/repro.sh @@ -31,8 +31,27 @@ cd pydatatable virtualenv py-pydatatable --python=/usr/bin/python3.10 cd ../pandas virtualenv py-pandas --python=/usr/bin/python3.10 +################# +# Install modin # +################# cd ../modin -virtualenv py-modin --python=/usr/bin/python3.10 +curl -o install_miniconda.sh -L https://repo.anaconda.com/miniconda/Miniconda3-py311_23.9.0-0-Linux-x86_64.sh && \ + sh install_miniconda.sh -u -b -p ./py-modin && \ + rm -f install_miniconda.sh + +eval source ./modin/py-modin/bin/activate +conda install -y conda-libmamba-solver + +conda create --name modin -y +conda activate modin +echo "conda activate modin" >> ./py-modin/bin/activate + +# install binaries +conda install -y -c conda-forge modin-hdk --solver=libmamba + +conda deactivate +conda deactivate + cd .. @@ -45,8 +64,9 @@ python3 -m pip install --upgrade pandas deactivate source ./modin/py-modin/bin/activate -python3 -m pip install --upgrade modin -deactivate +conda update modin-hdk -y -c conda-forge --solver=libmamba +conda deactivate +conda deactivate source ./pydatatable/py-pydatatable/bin/activate python3 -m pip install --upgrade git+https://github.com/h2oai/datatable @@ -72,7 +92,7 @@ mv G1_1e7_1e2_0_0.csv data/ echo "Changing run.conf and _control/data.csv to run only groupby at 0.5GB" cp run.conf run.conf.original sed -i 's/groupby join groupby2014/groupby/g' run.conf -sed -i 's/data.table dplyr pandas pydatatable spark dask clickhouse polars arrow duckdb/data.table dplyr duckdb/g' run.conf +sed -i 's/data.table dplyr pandas modin pydatatable spark dask clickhouse polars arrow duckdb/data.table dplyr duckdb/g' run.conf sed -i 's/DO_PUBLISH=true/DO_PUBLISH=false/g' run.conf # set sizes diff --git a/modin/groupby-modin.py b/modin/groupby-modin.py index d4e45a79..93a5115e 100755 --- a/modin/groupby-modin.py +++ b/modin/groupby-modin.py @@ -5,19 +5,33 @@ import os import gc import timeit + +# Set up HDK backend +os.environ["MODIN_ENGINE"] = "native" +os.environ["MODIN_STORAGE_FORMAT"] = "hdk" +os.environ["MODIN_EXPERIMENTAL"] = "True" +os.environ["MODIN_CPUS"] = "40" + + import modin as modin import modin.pandas as pd -import ray +from modin.utils import execute -exec(open("./_helpers/helpers.py").read()) -ver = modin.__version__ +def init_modin_on_hdk(): + """Modin on HDK warmup before benchmarking for calcite""" + from modin.experimental.sql import query + + data = {"a": [1, 2, 3]} + df = pd.DataFrame(data) + query("SELECT * FROM df", df=df) -ray.init(runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}}) -warnings.filterwarnings('ignore') -os.environ["MODIN_ENGINE"] = "ray" +init_modin_on_hdk() +exec(open("./_helpers/helpers.py").read()) + +ver = modin.__version__ git = "" task = "groupby" solution = "modin" @@ -25,297 +39,311 @@ cache = "TRUE" on_disk = "FALSE" -data_name = os.environ['SRC_DATANAME'] -src_grp = os.path.join("data", data_name+".csv") +data_name = os.environ["SRC_DATANAME"] +src_grp = os.path.join("data", data_name + ".csv") print("loading dataset %s" % data_name, flush=True) -x = pd.read_csv(src_grp, dtype={'id1':'category', 'id2':'category', 'id3':'category'}) -print(len(x.index), flush=True) +x = pd.read_csv( + src_grp, + dtype={ + "id1": "category", + "id2": "category", + "id3": "category", + **{n: "int32" for n in ["id4", "id5", "id6", "v1", "v2"]}, + "v3": "float64", + }, +) +# To trigger non-lazy loading +execute(x, trigger_hdk_import=True) + +gb_params = dict(as_index=False, sort=False, observed=True) task_init = timeit.default_timer() print("grouping...", flush=True) -question = "sum v1 by id1" # q1 +question = "sum v1 by id1" # q1 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id1'], observed=True).agg({'v1':'sum'}) -ans.reset_index(inplace=True) # #68 +ans = x.groupby(["id1"], **gb_params).agg({"v1": "sum"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum()] +chk = [ans["v1"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id1'], observed=True).agg({'v1':'sum'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id1"], **gb_params).agg({"v1": "sum"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum()] +chk = [ans["v1"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "sum v1 by id1:id2" # q2 +question = "sum v1 by id1:id2" # q2 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id1','id2'], observed=True).agg({'v1':'sum'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id1", "id2"], **gb_params).agg({"v1": "sum"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum()] +chk = [ans["v1"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id1','id2'], observed=True).agg({'v1':'sum'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id1", "id2"], **gb_params).agg({"v1": "sum"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum()] +chk = [ans["v1"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "sum v1 mean v3 by id3" # q3 +question = "sum v1 mean v3 by id3" # q3 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id3'], observed=True).agg({'v1':'sum', 'v3':'mean'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id3"], **gb_params).agg({"v1": "sum", "v3": "mean"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v3'].sum()] +chk = [ans["v1"].sum(), ans["v3"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id3'], observed=True).agg({'v1':'sum', 'v3':'mean'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id3"], **gb_params).agg({"v1": "sum", "v3": "mean"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v3'].sum()] +chk = [ans["v1"].sum(), ans["v3"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "mean v1:v3 by id4" # q4 +question = "mean v1:v3 by id4" # q4 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id4'], observed=True).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id4"], **gb_params).agg({"v1": "mean", "v2": "mean", "v3": "mean"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] +chk = [ans["v1"].sum(), ans["v2"].sum(), ans["v3"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id4'], observed=True).agg({'v1':'mean', 'v2':'mean', 'v3':'mean'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id4"], **gb_params).agg({"v1": "mean", "v2": "mean", "v3": "mean"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] +chk = [ans["v1"].sum(), ans["v2"].sum(), ans["v3"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "sum v1:v3 by id6" # q5 +question = "sum v1:v3 by id6" # q5 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id6'], observed=True).agg({'v1':'sum', 'v2':'sum', 'v3':'sum'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id6"], **gb_params).agg({"v1": "sum", "v2": "sum", "v3": "sum"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] +chk = [ans["v1"].sum(), ans["v2"].sum(), ans["v3"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id6'], observed=True).agg({'v1':'sum', 'v2':'sum', 'v3':'sum'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id6"], **gb_params).agg({"v1": "sum", "v2": "sum", "v3": "sum"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v1'].sum(), ans['v2'].sum(), ans['v3'].sum()] +chk = [ans["v1"].sum(), ans["v2"].sum(), ans["v3"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "median v3 sd v3 by id4 id5" # q6 +question = "median v3 sd v3 by id4 id5" # q6 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id4','id5'], observed=True).agg({'v3': ['median','std']}) -ans.reset_index(inplace=True) +ans = x.groupby(["id4", "id5"], **gb_params).agg({"v3": ["median", "std"]}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +chk = [ans[("v3", "median")].sum(), ans[("v3", "std")].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id4','id5'], observed=True).agg({'v3': ['median','std']}) -ans.reset_index(inplace=True) +ans = x.groupby(["id4", "id5"], **gb_params).agg({"v3": ["median", "std"]}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()] +chk = [ans[("v3", "median")].sum(), ans[("v3", "std")].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "max v1 - min v2 by id3" # q7 +question = "max v1 - min v2 by id3" # q7 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id3'], observed=True).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] -ans.reset_index(inplace=True) +ans = x.groupby(["id3"], **gb_params).agg({"v1": "max", "v2": "min"}).assign(range_v1_v2=lambda x: x["v1"] - x["v2"])[["id3", "range_v1_v2"]] +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['range_v1_v2'].sum()] +chk = [ans["range_v1_v2"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id3'], observed=True).agg({'v1': 'max', 'v2': 'min'}).assign(range_v1_v2=lambda x: x['v1'] - x['v2'])[['range_v1_v2']] -ans.reset_index(inplace=True) +ans = x.groupby(["id3"], **gb_params).agg({"v1": "max", "v2": "min"}).assign(range_v1_v2=lambda x: x["v1"] - x["v2"])[["id3", "range_v1_v2"]] +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['range_v1_v2'].sum()] +chk = [ans["range_v1_v2"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "largest two v3 by id6" # q8 +question = "largest two v3 by id6" # q8 gc.collect() t_start = timeit.default_timer() -ans = x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6'], observed=True).head(2) -ans.reset_index(drop=True, inplace=True) +ans = x.groupby("id6", sort=False, observed=True)["v3"].nlargest(2).reset_index()[["id6", "v3"]] +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v3'].sum()] +chk = [ans["v3"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x[['id6','v3']].sort_values('v3', ascending=False).groupby(['id6'], observed=True).head(2) -ans.reset_index(drop=True, inplace=True) +ans = x.groupby("id6", sort=False, observed=True)["v3"].nlargest(2).reset_index()[["id6", "v3"]] +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v3'].sum()] +chk = [ans["v3"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "regression v1 v2 by id2 id4" # q9 -#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).corr().iloc[0::2][['v2']]**2 # slower, 76s vs 47s on 1e8 1e2 +question = "regression v1 v2 by id2 id4" # q9 +# ans = x[['id2','id4','v1','v2']].groupby(['id2','id4']).corr().iloc[0::2][['v2']]**2 # slower, 76s vs 47s on 1e8 1e2 gc.collect() t_start = timeit.default_timer() -ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2})) -ans.reset_index(inplace=True) +from modin.experimental.sql import query + +ans = query("SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM df GROUP BY id2, id4;", df=x) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['r2'].sum()] +chk = [ans["r2"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2})) -ans.reset_index(inplace=True) +ans = query("SELECT id2, id4, POWER(CORR(v1, v2), 2) AS r2 FROM df GROUP BY id2, id4;", df=x) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['r2'].sum()] +chk = [ans["r2"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -question = "sum v3 count by id1:id6" # q10 +question = "sum v3 count by id1:id6" # q10 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id1','id2','id3','id4','id5','id6'], observed=True).agg({'v3':'sum', 'v1':'count'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id1", "id2", "id3", "id4", "id5", "id6"], **gb_params).agg({"v3": "sum", "v1": "count"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v3'].sum(), ans['v1'].sum()] +chk = [ans["v3"].sum(), ans["v1"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(['id1','id2','id3','id4','id5','id6'], observed=True).agg({'v3':'sum', 'v1':'count'}) -ans.reset_index(inplace=True) +ans = x.groupby(["id1", "id2", "id3", "id4", "id5", "id6"], **gb_params).agg({"v3": "sum", "v1": "count"}) +execute(ans) print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['v3'].sum(), ans['v1'].sum()] +chk = [ans["v3"].sum(), ans["v1"].sum()] chkt = timeit.default_timer() - t_start write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans -print("grouping finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) +print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) exit(0) diff --git a/modin/join-modin.py b/modin/join-modin.py index b7c69650..3c4ce57e 100755 --- a/modin/join-modin.py +++ b/modin/join-modin.py @@ -5,65 +5,231 @@ import os import gc import timeit + +# Set up HDK backend +os.environ["MODIN_ENGINE"] = "native" +os.environ["MODIN_STORAGE_FORMAT"] = "hdk" +os.environ["MODIN_EXPERIMENTAL"] = "True" +os.environ["MODIN_CPUS"] = "40" + + +import modin as modin import modin.pandas as pd +from modin.utils import execute + -exec(open("./helpers.py").read()) +def init_modin_on_hdk(): + """Modin on HDK warmup before benchmarking for calcite""" + from modin.experimental.sql import query -src_x = os.environ['SRC_X_LOCAL'] -src_y = os.environ['SRC_Y_LOCAL'] + data = {"a": [1, 2, 3]} + df = pd.DataFrame(data) + query("SELECT * FROM df", df=df) -ver = "" #pd.__version__ + +init_modin_on_hdk() + +exec(open("./_helpers/helpers.py").read()) + +ver = modin.__version__ git = "" task = "join" -question = "inner join" -l = [os.path.basename(src_x), os.path.basename(src_y)] -data_name = '-'.join(l) solution = "modin" fun = "merge" cache = "TRUE" +on_disk = "FALSE" + -print("loading datasets...") +data_name = os.environ["SRC_DATANAME"] +src_jn_x = os.path.join("data", data_name + ".csv") +y_data_name = join_to_tbls(data_name) +src_jn_y = [ + os.path.join("data", y_data_name[0] + ".csv"), + os.path.join("data", y_data_name[1] + ".csv"), + os.path.join("data", y_data_name[2] + ".csv"), +] +if len(src_jn_y) != 3: + raise Exception("Something went wrong in preparing files used for join") -x = pd.read_csv(os.path.basename(src_x)) -y = pd.read_csv(os.path.basename(src_y)) -print("joining...") +print( + "loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[1] + ", " + y_data_name[2], + flush=True, +) + +x = pd.read_csv( + src_jn_x, + dtype={ + **{n: "int32" for n in ["id1", "id2", "id3"]}, + **{n: "category" for n in ["id4", "id5", "id6"]}, + "v1": "float64", + }, +) + +small = pd.read_csv(src_jn_y[0], dtype={"id1": "int32", "id4": "category", "v2": "float64"}) +medium = pd.read_csv( + src_jn_y[1], + dtype={ + **{n: "int32" for n in ["id1", "id2"]}, + **{n: "category" for n in ["id4", "id5"]}, + "v2": "float64", + }, +) +big = pd.read_csv( + src_jn_y[2], + dtype={ + **{n: "int32" for n in ["id1", "id2", "id3"]}, + **{n: "category" for n in ["id4", "id5", "id6"]}, + "v2": "float64", + }, +) + +# To trigger non-lazy loading +[execute(df, trigger_hdk_import=True) for df in [x, small, medium, big]] + +task_init = timeit.default_timer() +print("joining...", flush=True) + +question = "small inner on int" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(small, on="id1") +execute(ans) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans["v1"].sum(), ans["v2"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(small, on="id1") +execute(ans) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans["v1"].sum(), ans["v2"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "medium inner on int" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(medium, on="id2") +execute(ans) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans["v1"].sum(), ans["v2"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(medium, on="id2") +execute(ans) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans["v1"].sum(), ans["v2"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans -# NotImplementedError: To contribute to Pandas on Ray, please visit github.com/modin-project/modin +question = "medium outer on int" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(medium, how="left", on="id2") +execute(ans) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans["v1"].sum(), ans["v2"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans gc.collect() t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) +ans = x.merge(medium, how="left", on="id2") +execute(ans) +print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] +chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) del ans +question = "medium inner on factor" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(medium, on="id5") +execute(ans) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans["v1"].sum(), ans["v2"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans gc.collect() t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) +ans = x.merge(medium, on="id5") +execute(ans) +print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] +chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) del ans +question = "big inner on int" # q5 gc.collect() t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) +ans = x.merge(big, on="id3") +execute(ans) +print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] +chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=3, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans +gc.collect() +t_start = timeit.default_timer() +ans = x.merge(big, on="id3") +execute(ans) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans["v1"].sum(), ans["v2"].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +print("joining finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) exit(0) diff --git a/modin/setup-modin.sh b/modin/setup-modin.sh index 4ef46d87..34a9b32f 100755 --- a/modin/setup-modin.sh +++ b/modin/setup-modin.sh @@ -1,16 +1,22 @@ #!/bin/bash set -e -virtualenv modin/py-modin --python=python3 -source modin/py-modin/bin/activate +curl -o install_miniconda.sh -L https://repo.anaconda.com/miniconda/Miniconda3-py311_23.9.0-0-Linux-x86_64.sh && \ + sh install_miniconda.sh -u -b -p ./modin/py-modin && \ + rm -f install_miniconda.sh + +eval source ./modin/py-modin/bin/activate +conda install -y conda-libmamba-solver + +conda create --name modin -y +conda activate modin +echo "conda activate modin" >> ./modin/py-modin/bin/activate # install binaries -python3 -m pip install --upgrade modin[all] +conda install -y -c conda-forge modin-hdk --solver=libmamba # check -python3 -import modin -modin.__version__ -quit() +python3 -c "import modin; print(modin.__version__)" -deactivate +conda deactivate +conda deactivate diff --git a/modin/upg-modin.sh b/modin/upg-modin.sh index 80ca5591..7c6ae50d 100755 --- a/modin/upg-modin.sh +++ b/modin/upg-modin.sh @@ -4,5 +4,4 @@ set -e echo 'upgrading modin...' source ./modin/py-modin/bin/activate - -python -m pip install --upgrade modin[all] > /dev/null +conda update modin-hdk -y -c conda-forge --solver=libmamba diff --git a/run.conf b/run.conf index 14e0f435..5d98cbaf 100644 --- a/run.conf +++ b/run.conf @@ -1,7 +1,7 @@ # task, used in init-setup-iteration.R export RUN_TASKS="groupby join" # solution, used in init-setup-iteration.R -export RUN_SOLUTIONS="collapse data.table juliads juliadf dplyr pandas pydatatable spark dask clickhouse polars arrow duckdb duckdb-latest datafusion" +export RUN_SOLUTIONS="collapse data.table juliads juliadf dplyr pandas modin pydatatable spark dask clickhouse polars arrow duckdb duckdb-latest datafusion" # flag to upgrade tools, used in run.sh on init export DO_UPGRADE=false