Contents

Polars confusion

Abstract
We explore how to use polars’ out of the box optimizations to make parallelized computations. An interesting example is the confusion of a classifier model and classical derived metrics like precision, recall or the f1-score.
Note
Polars is a blazingly fast DataFrame library implemented in Rust. It just had its 1.0 (actually 1.1) release and is a production ready tool with a stable an well designed api.

Setup

Let us first generate some dummy data, which consists of labels (y_true) and model scores (y_prob) with values in [0,1]. Furthermore we generate grouping variables id_1 and id_2, which will later be used to partition our data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import polars as pl
import polars.selectors as cs

import numpy as np

np.random.seed(314)

def generate_data(n: int) -> pl.DataFrame:
  """
  Generate a DataFrame with random classifier output.

  Parameters:
    n (int): The number of rows in the DataFrame.

  Returns:
    pl.DataFrame: The generated DataFrame.

  """
  y_true = np.random.choice([True, False], n)
  y_prob = np.random.uniform(0, 1, n)

  id_1 = np.random.choice(15, n)
  id_2 = np.random.choice(10, n)

  schema = {
    "id_1": pl.Int32,
    "id_2": pl.Int32,
    "y_true": pl.Boolean,
    "y_prob": pl.Float32,
  }

  df = pl.DataFrame(
    {
      "id_1": id_1.tolist(),
      "id_2": id_2.tolist(),
      "y_true": y_true.tolist(),
      "y_prob": y_prob.tolist(),
    },
    schema=schema,
  )

  return df

print(generate_data(10).head())
shape: (5, 4)
┌──────┬──────┬────────┬──────────┐
│ id_1 ┆ id_2 ┆ y_true ┆ y_prob   │
│ ---  ┆ ---  ┆ ---    ┆ ---      │
│ i32  ┆ i32  ┆ bool   ┆ f32      │
╞══════╪══════╪════════╪══════════╡
│ 13   ┆ 2    ┆ true   ┆ 0.827355 │
│ 6    ┆ 0    ┆ false  ┆ 0.727951 │
│ 13   ┆ 7    ┆ false  ┆ 0.26048  │
│ 2    ┆ 3    ┆ false  ┆ 0.911763 │
│ 7    ┆ 2    ┆ true   ┆ 0.260757 │
└──────┴──────┴────────┴──────────┘

Problem description

Metrics like precision, recall or the f1-score are defined entirely in terms of the confusion (i.e. the number true positives, false positives, true negatives and false negatives). These in turn are defined in terms of a threshold which defines the boolean predictions, being positive if and only if the score is greater than or equal to the threshold.

Thus we can ask ourselves: Which threshold can we use to optimize for example the f1-score ?

The most naive approach is to compute all values of the f1-score for a large enough number of thresholds to find some theta for which the f1-score is maximal.

The next question could be: Which thresholds can we use to optimize the f1-score on certain partitions of our data ?

We will answer both questions at once since the first question is a special case (with a one element partition).

Working with wide dataframes

An elegant solution to the problem is to use wide dataframes. For each theta, we generate a boolean column y_pred(theta) and four columns defining the confusion with respect to that theta. Furthermore, we add one more column with our metric in question, lets say the f1-score. In total we get 6*len(theta) extra columns.

Now, the whole magic of why polars is such a great choice to solve this problem is that all expression will be calculated in parallel, and in addition, we can easily group our dataframe by our id (grouping) variables that define the partition.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def add_y_pred(theta: list[float]) -> dict[str, pl.Expr]:
    """
    Add columns to the DataFrame with boolean predictions for different thresholds.
    """
    return {
        f"y_pred_{i}" : pl.col("y_prob") >= theta 
          for i, theta in enumerate(theta)
    }

def add_confusion(theta: list[float]) -> dict[str, pl.Expr]:
    """
    Add columns to the DataFrame with the confusion matrix for different thresholds.
    """
    return {
        f"tp_{i}" : pl.col("y_true") & pl.col(f"y_pred_{i}")  
          for i, theta in enumerate(theta)
    } | {
        f"fp_{i}" : ~pl.col("y_true") & pl.col(f"y_pred_{i}") 
          for i, theta in enumerate(theta)
    } | {
        f"tn_{i}" : ~pl.col("y_true") & ~pl.col(f"y_pred_{i}") 
          for i, theta in enumerate(theta)
    } | {
        f"fn_{i}" : pl.col("y_true")  & ~pl.col(f"y_pred_{i}") 
          for i, theta in enumerate(theta)
    }

def add_f1_score(theta: list[float]) -> dict[str, pl.Expr]:
    """
    Add columns to the DataFrame with the f1-score for different thresholds.
    Uses the confusion matrix.
    """
    return {
        f"f1_score_{i}" : 2 * pl.col(f"tp_{i}").sum() / (
            2 * pl.col(f"tp_{i}").sum() + pl.col(f"fp_{i}").sum() + pl.col(f"fn_{i}").sum()
          ) for i, theta in enumerate(theta)
    }

def select_best_theta(
  df: pl.DataFrame,
  theta: list[float],
) -> pl.DataFrame:
  """
  Select the best threshold.
  """
  df_theta = pl.DataFrame({"index" : range(len(theta)), "theta" : theta},
    schema={
      "index" : pl.UInt32, 
      "theta" : pl.Float32,
    })

  return (
    df
    .with_columns(
        theta_opt_ind = pl.concat_list(cs.starts_with("f1")).list.arg_max(),
        f1_opt = pl.concat_list(cs.starts_with("f1")).list.max(),
    )
    .join(
      df_theta,
      left_on="theta_opt_ind",
      right_on="index"
    )
    .rename({
      "theta" : "theta_opt",
    })
  )


def optimize(df: pl.DataFrame, theta: list[float], group_by: list[str]) -> pl.DataFrame:
  """
  Optimize the f1-score for different thresholds on different partitions of the data.
  """
  return (
    df.with_columns(
        **add_y_pred(theta),
    )
    .with_columns(
        **add_confusion(theta),
    )
    .group_by(group_by)
    .agg(
        **add_f1_score(theta)
    )
    .select(
        *group_by, cs.starts_with("f1")
    )
    .pipe(
      select_best_theta, theta
    )
    .select(
        *group_by, "theta_opt", "f1_opt",
    )
  )
Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
theta = [0.1, 0.5]
groups=["id_1"]
df=generate_data(100)

df_wide = (
    df.with_columns(
        **add_y_pred(theta),
    )
    .with_columns(
        **add_confusion(theta),
    )
)
df_grouped = (
  df_wide
  .group_by(groups)
  .agg(
    **add_f1_score(theta)
  )
)

df_opt = (
  df_grouped
  .pipe(select_best_theta, theta)
  .select(
    *groups, "theta_opt_ind", "theta_opt", "f1_opt",
  )
)

print("DataFrame with confusion and boolean predictions:")
print(df_wide.head())
print("f1-scores for different thresholds and groups:")
print(df_grouped.head())
print("Optimal thresholds and f1-scores:")
print(df_opt.head())
DataFrame with confusion and boolean predictions:
shape: (5, 14)
┌──────┬──────┬────────┬──────────┬───┬───────┬───────┬───────┬───────┐
│ id_1 ┆ id_2 ┆ y_true ┆ y_prob   ┆ … ┆ tn_0  ┆ tn_1  ┆ fn_0  ┆ fn_1  │
│ ---  ┆ ---  ┆ ---    ┆ ---      ┆   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
│ i32  ┆ i32  ┆ bool   ┆ f32      ┆   ┆ bool  ┆ bool  ┆ bool  ┆ bool  │
╞══════╪══════╪════════╪══════════╪═══╪═══════╪═══════╪═══════╪═══════╡
│ 5    ┆ 6    ┆ false  ┆ 0.771075 ┆ … ┆ false ┆ false ┆ false ┆ false │
│ 13   ┆ 8    ┆ true   ┆ 0.566425 ┆ … ┆ false ┆ false ┆ false ┆ false │
│ 0    ┆ 6    ┆ true   ┆ 0.306617 ┆ … ┆ false ┆ false ┆ false ┆ true  │
│ 13   ┆ 7    ┆ false  ┆ 0.977795 ┆ … ┆ false ┆ false ┆ false ┆ false │
│ 6    ┆ 0    ┆ true   ┆ 0.88869  ┆ … ┆ false ┆ false ┆ false ┆ false │
└──────┴──────┴────────┴──────────┴───┴───────┴───────┴───────┴───────┘
f1-scores for different thresholds and groups:
shape: (5, 3)
┌──────┬────────────┬────────────┐
│ id_1 ┆ f1_score_0 ┆ f1_score_1 │
│ ---  ┆ ---        ┆ ---        │
│ i32  ┆ f64        ┆ f64        │
╞══════╪════════════╪════════════╡
│ 9    ┆ 0.666667   ┆ 0.571429   │
│ 0    ┆ 0.8        ┆ 0.545455   │
│ 12   ┆ 0.666667   ┆ 0.0        │
│ 5    ┆ 0.2        ┆ 0.222222   │
│ 13   ┆ 0.571429   ┆ 0.666667   │
└──────┴────────────┴────────────┘
Optimal thresholds and f1-scores:
shape: (5, 4)
┌──────┬───────────────┬───────────┬──────────┐
│ id_1 ┆ theta_opt_ind ┆ theta_opt ┆ f1_opt   │
│ ---  ┆ ---           ┆ ---       ┆ ---      │
│ i32  ┆ u32           ┆ f32       ┆ f64      │
╞══════╪═══════════════╪═══════════╪══════════╡
│ 9    ┆ 0             ┆ 0.1       ┆ 0.666667 │
│ 0    ┆ 0             ┆ 0.1       ┆ 0.8      │
│ 12   ┆ 0             ┆ 0.1       ┆ 0.666667 │
│ 5    ┆ 1             ┆ 0.5       ┆ 0.222222 │
│ 13   ┆ 1             ┆ 0.5       ┆ 0.666667 │
└──────┴───────────────┴───────────┴──────────┘

More data

Now, lets see if we can handle more data. Note that the output is generated on a single core machine and hence the parallelization is not any help here, however I think that the result is still quite impressive. PS: I tried to implement the same logic in pandas, however the performance was not even close to the one of polars and I got a lot of warnings and errors about too many columns. Feel free to benchmark this on your own or any other machine. On my 8 core 16 GB machine, the following code runs in approximately one second.

1
2
3
import psutil
print(f"CPU: {psutil.cpu_count()}")
print(f"Memory: {psutil.virtual_memory().total / 1024 ** 2} MB")
CPU: 1
Memory: 769.046875 MB
1
2
groups=["id_1", "id_2"]
df = generate_data(1_000_000)
1
2
3
4
%%time
# use 100 equidistant thresholds
opt = optimize(df, np.arange(0,1, 0.01), groups)
print(opt.head(5))
shape: (5, 4)
┌──────┬──────┬───────────┬──────────┐
│ id_1 ┆ id_2 ┆ theta_opt ┆ f1_opt   │
│ ---  ┆ ---  ┆ ---       ┆ ---      │
│ i32  ┆ i32  ┆ f32       ┆ f64      │
╞══════╪══════╪═══════════╪══════════╡
│ 8    ┆ 8    ┆ 0.0       ┆ 0.668224 │
│ 9    ┆ 2    ┆ 0.0       ┆ 0.671309 │
│ 1    ┆ 4    ┆ 0.0       ┆ 0.663598 │
│ 6    ┆ 7    ┆ 0.0       ┆ 0.668925 │
│ 5    ┆ 8    ┆ 0.0       ┆ 0.668914 │
└──────┴──────┴───────────┴──────────┘
CPU times: user 1.41 s, sys: 101 ms, total: 1.51 s
Wall time: 1.56 s
1
2
3
4
5
6
7
8
9
import hvplot
hvplot.extension("matplotlib")

(
  opt
  .with_columns(f1_opt=pl.col("f1_opt").round(2))
  .plot
  .heatmap("id_1", "id_2", "f1_opt", height=600, width=800)
)

Feel free to experiment with the code or to build a benchmark using different techniques and tools.