Skip to content

Modules

do_commit(cwd)

Commits the benchmark results to the repository.

Parameters:

Name Type Description Default
cwd str

The current working directory.

required

Raises

CalledProcessError: If any of the subprocess commands fail.
Source code in src/benchie/__init__.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def do_commit(cwd):
    """
    Commits the benchmark results to the repository.

    Args:
        cwd (str): The current working directory.

    Raises
    ------
        CalledProcessError: If any of the subprocess commands fail.

    """
    logger.info("Committing results")
    subprocess.run(["git", "add", "**/*benchmark.md"], check=True, cwd=cwd)
    subprocess.run(["pre-commit", "run", "--all-files"], check=False, cwd=cwd)
    subprocess.run(["git", "add", "**/*benchmark.md"], check=True, cwd=cwd)
    subprocess.run(["git", "commit", "-am", "benchmark solutions"], check=True, cwd=cwd)
    subprocess.run(["git", "pull", "--rebase"], check=True, cwd=cwd)
    subprocess.run(["git", "push"], check=True, cwd=cwd)

main(output, data, force, commit, course_id, exercise_name, exercise_id, solutions, token, skip_fetch, skip_benchmark, subset, subset_data, disable_pretest, timeout, loop, loop_timeout, benchmark_options, docker_image, *args, **kwargs)

Main function for benchmarking and processing data.

Parameters:

Name Type Description Default
output str

Path to the output directory.

required
data str

Path to the data folder.

required
force bool

Flag indicating whether to force benchmarking even if there are no new submissions.

required
commit bool

Flag indicating whether to commit the changes.

required
course_id str

Course ID.

required
exercise_name str

Exercise name.

required
exercise_id str

Exercise ID.

required
solutions str

Path to the solutions directory.

required
token str

Path to the token location.

required
skip_fetch bool

Flag indicating whether to skip fetching new submissions.

required
skip_benchmark bool

Flag indicating whether to skip benchmarking.

required
subset int

Number of solutions to consider.

required
subset_data int

Number of data files to consider.

required
disable_pretest bool

Flag indicating whether to test the correctness of the solutions.

required
timeout float

Timeout value for benchmarking.

required
loop bool

Flag indicating whether to run the benchmark in an infinite loop.

required
loop_timeout float

Timeout value for the loop.

required
benchmark_options list[BenchmarkOption]

List of benchmarking options.

required
docker_image str

Docker image to use for benchmarking.

required

Returns

None
Source code in src/benchie/__init__.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def main(
    output,
    data,
    force,
    commit,
    course_id,
    exercise_name,
    exercise_id,
    solutions,
    token,
    skip_fetch,
    skip_benchmark,
    subset,
    subset_data,
    disable_pretest,
    timeout,
    loop,
    loop_timeout,
    benchmark_options,
    docker_image,
    *args,
    **kwargs,
):
    """
    Main function for benchmarking and processing data.

    Args:
        output (str): Path to the output directory.
        data (str): Path to the data folder.
        force (bool): Flag indicating whether to force benchmarking even if there are no new submissions.
        commit (bool): Flag indicating whether to commit the changes.
        course_id (str): Course ID.
        exercise_name (str): Exercise name.
        exercise_id (str): Exercise ID.
        solutions (str): Path to the solutions directory.
        token (str): Path to the token location.
        skip_fetch (bool): Flag indicating whether to skip fetching new submissions.
        skip_benchmark (bool): Flag indicating whether to skip benchmarking.
        subset (int): Number of solutions to consider.
        subset_data (int): Number of data files to consider.
        disable_pretest (bool): Flag indicating whether to test the correctness of the solutions.
        timeout (float): Timeout value for benchmarking.
        loop (bool): Flag indicating whether to run the benchmark in an infinite loop.
        loop_timeout (float): Timeout value for the loop.
        benchmark_options (list[BenchmarkOption]): List of benchmarking options.
        docker_image (str): Docker image to use for benchmarking.

    Returns
    -------
        None
    """
    cwd: Path = Path.cwd().resolve()
    data = Path(data).resolve() / exercise_name
    if not data.exists():
        logger.error(f"Data folder {data} does not exist")
        return
    output = Path(output).resolve() / exercise_name
    output.mkdir(exist_ok=True, parents=True)
    solutions_path = Path(solutions).resolve() / exercise_name
    while True:
        if not skip_fetch:
            refreshed = refresh(
                # course id
                course_id,
                # exercise id
                exercise_id,
                # output directory
                solutions_path,
                # token location
                token,
            )
        else:
            refreshed = True
        logger.info(force)
        if force or refreshed:
            logger.info("New submissions or forced")
            # there is new data
            # benchmark(6, Path("reconstruction/J02459.1.6mers"), output=output)
            # benchmark(50, Path("reconstruction/J02459.1.50mers"), output=output)
            # make sure solutions are importable
            logger.debug(solutions_path)
            sys.path.append(str(solutions_path))

            # find folders or .py files
            all_solutions = [
                p for p in solutions_path.iterdir() if (p.is_dir() and p.name != "__pycache__") or p.suffix == ".py"
            ][:subset]
            logger.info(f"Found {len(all_solutions)} solutions.")
            valid_solutions = all_solutions

            data_paths: list[Path] = sorted(data.resolve().glob("data_*.py"))[:subset_data]
            if not data_paths:
                logger.info("No data to process")
                return
            for path in data_paths:
                if not valid_solutions:
                    logger.error("No valid solutions to benchmark.")
                    break
                assert path.exists(), f"Path {path} does not exist"
                logger.info(f"Testing on data {path.name}")
                output_folder_data = output / path.stem
                if not skip_benchmark:
                    valid_solutions = benchmark(
                        path,
                        subset=subset,
                        output=output_folder_data,
                        solutions=valid_solutions,
                        timeout=timeout,
                        disable_pretest=disable_pretest,
                        benchmark_options=benchmark_options,
                    )
                logger.info("Postprocess")
                postprocess_output(path, output_folder_data)
                if commit:
                    logger.info("Committing")
                    do_commit(cwd)
                else:
                    logger.info("Not committing")
        else:
            # no new data
            logger.info("No new submissions. Not Benchmarking")
            if loop:
                logger.info(f"Sleeping for {loop_timeout} seconds.")
                time.sleep(loop_timeout)
        if not loop:
            break

benchmark

BenchmarkOption

Bases: Enum

Benchmarking options.

Source code in src/benchie/benchmark.py
16
17
18
19
20
21
22
class BenchmarkOption(Enum):
    """Benchmarking options."""

    HYPERFINE = "hyperfine"
    SCALENE = "scalene"
    MEMRAY_TRACKER = "memray_tracker"
    MEMRAY_IMPORTS = "memray_imports"

benchmark(testfile, output, solutions, timeout, disable_pretest, benchmark_options, subset=None, docker_image=None)

Perform benchmarking on submissions.

Parameters:

Name Type Description Default
testfile Path

Path to the test file.

required
subset str

Subset of the test file to use.

None
output Path

Path to the output directory.

required
solutions List[Path]

List of paths to the solutions.

required
timeout int

Timeout value in seconds.

required
Returns
List[Path]: List of correct solutions.
Source code in src/benchie/benchmark.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def benchmark(
    testfile,
    output,
    solutions,
    timeout,
    disable_pretest,
    benchmark_options: list[BenchmarkOption],
    subset=None,
    docker_image=None,
):
    """
    Perform benchmarking on submissions.

    Args:
        testfile (Path): Path to the test file.
        subset (str): Subset of the test file to use.
        output (Path): Path to the output directory.
        solutions (List[Path]): List of paths to the solutions.
        timeout (int): Timeout value in seconds.

    Returns
    -------
        List[Path]: List of correct solutions.

    """
    logger.info("Benchmarking submissions")
    if output.exists():
        # remove everything in the output directory, except the .md files
        for path in output.glob("*"):
            if path.suffix != ".md":
                if path.is_dir():
                    shutil.rmtree(path)
                else:
                    path.unlink()
    output.mkdir(exist_ok=True)

    testfile = testfile.resolve()
    prep_workdir(testfile.parent)

    if not disable_pretest:
        # test solution correctness and report errors
        logger.info("Testing correctness.")
        all_correct_solutions = []
        for solution in solutions:
            try:
                if docker_image:
                    run_once_docker(docker_image, solution, testfile, timeout)
                else:
                    run_once(solution, testfile, timeout)
                # code = with_timeout(timeout, action='timeout')(exec)(command)
                # if code == 'timeout':
                #     logger.error(f"Timeout while testing '{solution.stem}'")
                #     continue
                # exec(command)
            except FileNotFoundError:
                logger.error(f"File not found while testing '{solution.stem}'")
                continue
            except subprocess.TimeoutExpired:
                logger.error(f"Timeout while testing '{solution.stem}'")
                continue
            except subprocess.CalledProcessError as e:
                logger.error(f"Error while testing '{solution.stem}'; {e}")
                continue
            all_correct_solutions.append(solution)
        logger.info(f"Correct solutions: {len(all_correct_solutions)}")
    else:
        all_correct_solutions = solutions

    if BenchmarkOption.HYPERFINE.value in benchmark_options:
        run_hyperfine_all(output, all_correct_solutions, testfile, subset=subset)

    # prepare for memory profiling
    n_memory_profiles = 3

    for path in all_correct_solutions:
        # change work dir to the solutions path
        if BenchmarkOption.MEMRAY_TRACKER.value in benchmark_options:
            peaks = []
            for i in range(n_memory_profiles):
                logger.debug(f"Running memray on {path}, {i}")
                workdir = prep_workdir(testfile.parent)
                i_output = output / f"memray_{i}"
                i_output.mkdir(exist_ok=True)
                memray_peak = run_memray(i_output, path, testfile, workdir, use_tracker=True, timeout=timeout)
                logger.debug(f"Peak memory usage: {memray_peak}")
                peaks.append(memray_peak)
            # get median peak memory usage, with support for KiB and MiB
            median_peak = sorted(peaks, key=lambda x: key_by_memory(x))[len(peaks) // 2]
            logger.info(f"Median peak memory usage: {median_peak}")
            # write median peak memory usage to file
            output_peak = output / f"{path.stem}_memray.txt"
            output_peak.write_text(str(median_peak))
        if BenchmarkOption.MEMRAY_IMPORTS.value in benchmark_options:
            logger.debug(f"Running memray on {path}")
            workdir = prep_workdir(testfile.parent)
            i_output = output / "memray_imports"
            i_output.mkdir(exist_ok=True)
            memray_peak = run_memray(i_output, path, testfile, workdir, use_tracker=False, timeout=timeout)
            logger.debug(f"Peak memory usage: {memray_peak}")
            # write median peak memory usage to file
            output_peak = output / f"{path.stem}_memray_imports.txt"
            output_peak.write_text(str(memray_peak))
        if BenchmarkOption.SCALENE.value in benchmark_options:
            from benchie.scalene import run_scalene

            peaks = []
            for i in range(n_memory_profiles):
                i_output = output / f"memory_{i}"
                i_output.mkdir(exist_ok=True)
                run_scalene(i_output, path, testfile)
    return all_correct_solutions

create_command(path, testfile, interpreter='python')

Create a command to execute a test file using a given path and interpreter.

Source code in src/benchie/benchmark.py
25
26
27
28
29
30
31
32
33
34
35
def create_command(path, testfile, interpreter="python"):
    """Create a command to execute a test file using a given path and interpreter."""
    if path.is_dir():
        assert (path / "src").exists(), f"Source folder {path / 'src'} does not exist"
        module = list((path / "src").iterdir())[0].name
    else:
        module = path.name.removesuffix(".py")
    fn_command = testfile.read_text()
    command = f"""import {module}; {module}.{fn_command}
    """
    return command

reporting

create_table(with_imports, with_tracker, timings=None)

Expected output:

Command Mean [s] Min [s] Max [s] Rank
13309298 4.500 ± 0.036 4.474 4.541 1.00
13309297 4.515 ± 0.116 4.445 4.648 1.00 ± 0.03

create_table({'13309298': '1.0 MB'}) ' | Command | Peak memory | Rank | \n | :--- | ---: | ---: | \n | 13309298 | | | | 1.0 MB | 0 | ' create_table({'13309298': '1.0 MB'}, {'results': [{'command': '13309298', 'mean': 4.5, 'stddev': 0.036, 'min': 4.474, 'max': 4.541}]}) ' | Command | Mean [s] | Min [s] | Max [s] | Peak memory | Rank | \n | :--- | ---: | ---: | ---: | ---: | ---: | \n | 13309298 | 4.500 ± 0.036 | 4.474 | 4.541 | 1.0 MB | 0 | '

Source code in src/benchie/reporting.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def create_table(with_imports, with_tracker, timings=None):
    r"""
    Expected output:

    | Command | Mean [s] | Min [s] | Max [s] | Rank |
    |:---|---:|---:|---:|---:|
    | `13309298` | 4.500 ± 0.036 | 4.474 | 4.541 | 1.00 |
    | `13309297` | 4.515 ± 0.116 | 4.445 | 4.648 | 1.00 ± 0.03 |

    >>> create_table({'13309298': '1.0 MB'})
    ' | Command | Peak memory | Rank | \n | :--- | ---: | ---: | \n | `13309298` |  |  |  | 1.0 MB | 0 | '
    >>> create_table({'13309298': '1.0 MB'}, {'results': [{'command': '13309298', 'mean': 4.5, 'stddev': 0.036, 'min': 4.474, 'max': 4.541}]})
    ' | Command | Mean [s] | Min [s] | Max [s] | Peak memory | Rank | \n | :--- | ---: | ---: | ---: | ---: | ---: | \n | `13309298` | 4.500 ± 0.036 | 4.474 | 4.541 | 1.0 MB | 0 | '
    """
    output = []
    d = " | "

    if timings is None:
        header = ["Command"]
        if with_imports:
            header.append("(with_imports) Peak memory")
        if with_tracker:
            header.append("(with_tracker) Median peak memory")
        header.append("Rank")
        output.append(d + d.join(header) + d)
        output.append(d + d.join([":---", *["---:" for _ in range(len(header) - 1)]]) + d)

        relative_peaks = make_relative(with_imports or with_tracker)
        for k in relative_peaks:
            columns = [f"`{k}`"]
            if with_imports:
                columns.append(with_imports[k])
            if with_tracker:
                columns.append(with_tracker[k])
            columns.append(str(relative_peaks[k]))
            output.append(d + d.join([str(c) for c in columns]) + d)
    else:
        header = ["Command", "Mean [s]", "Min [s]", "Max [s]"]
        if with_imports:
            header.append("(with_imports) Peak memory")
        if with_tracker:
            header.append("(with_tracker) Median peak memory")
        header.append("Rank")
        output.append(d + d.join(header) + d)
        output.append(d + d.join([":---", *["---:" for _ in range(len(header) - 1)]]) + d)
        d_relative = make_relative(with_imports or with_tracker)
        for c in timings["results"]:
            name = c["command"]
            columns = [
                f"{x:.3f}" if isinstance(x, float) else str(x)
                for x in [
                    f"`{name}`",
                    # mean + stdev,
                    f"{c['mean']:.3f} ± {c['stddev']:.3f}",
                    # min,
                    c["min"],
                    # max,
                    c["max"],
                ]
            ]
            if with_imports:
                columns.append(with_imports[name])
            if with_tracker:
                columns.append(with_tracker[name])
            columns.append(str(d_relative[name]))
            output.append(d + d.join(columns) + d)
    return "\n".join(output)

key_by_memory(s)

summary

:param s: description :return: description

key_by_memory('1.0 MB') 1.0 key_by_memory('1.0 GB') 1000.0

Source code in src/benchie/reporting.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def key_by_memory(s):
    """_summary_

    :param s: _description_
    :return: _description_
    >>> key_by_memory('1.0 MB')
    1.0
    >>> key_by_memory('1.0 GB')
    1000.0
    """
    if s.endswith("KB") or s.endswith("KiB"):
        return float(s[:-3]) / 1000
    elif s.endswith("MB") or s.endswith("MiB"):
        return float(s[:-3])
    elif s.endswith("GB") or s.endswith("GiB"):
        return float(s[:-3]) * 1000
    else:
        # e.g. None
        return float("inf")

make_relative(d)

summary

:param d: description :return: description

make_relative({'a': '1.0 MB', 'b': '2.0 MB'}) {'a': 0, 'b': 1} make_relative({'a': '1.0 MB', 'b': '1.0 MB'}) {'a': 0, 'b': 1} make_relative({'a': '1.0 GB', 'b': '2.0 KB'}) {'b': 0, 'a': 1}

Source code in src/benchie/reporting.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def make_relative(d):
    """_summary_

    :param d: _description_
    :return: _description_
    >>> make_relative({'a': '1.0 MB', 'b': '2.0 MB'})
    {'a': 0, 'b': 1}
    >>> make_relative({'a': '1.0 MB', 'b': '1.0 MB'})
    {'a': 0, 'b': 1}
    >>> make_relative({'a': '1.0 GB', 'b': '2.0 KB'})
    {'b': 0, 'a': 1}
    """
    sort_d = sorted(d.items(), key=lambda x: key_by_memory(x[1]))
    d_relative = {k: i for i, (k, _) in enumerate(sort_d)}
    return d_relative