Dataset processor

In this section, we’ll create the Dataset processor component and generate the first test resources. The Dataset processor component uses one of the pre-defined Common datasets to create the AnnData files used in your task. These AnnData objects can then serve as the first test resources to be used to test Method and Metric components.

Note

In the task template, the Dataset processor component is already created. You can find the component in the src/data_processors/process_dataset directory. You can use this component as a reference to create your own Dataset processor component or adjust as neeeded.

Why?

The Dataset processor component splits up the Common dataset into different AnnData objects such that both Method and Metric components can only observe the information specified by the file format specification files. This not only safe-guards against data leakage, but also ensures that the Method component can’t accidentally make changes to the data structure that the Metric component relies on.

How?

The Dataset processor, like any component in OpenProblems, is a Viash component, which consists of a config and a script.

Step 1: Create the Viash config

Start by creating the Viash config. Luckily, we already created its file format and argument list in the previous section, so we can just import the interface using the __merge__ field:

src/data_processors/process_dataset/config.vsh.yaml
1__merge__: ../../api/comp_data_processor.yaml
2name: process_dataset
3# arguments:
#   - name: "--method"
#     type: "string"
#     description: "The process method to assign train/test."
#     choices: ["batch", "random"]
#     default: "batch"
4resources:
5  - type: python_script
    path: script.py
6  - path: /common/helper_functions/subset_h5ad_by_format.py

engines: 
7  - type: docker
    image: openproblems/base_python:1.0.0

runners: 
  - type: executable
8  - type: nextflow
    directives:
      label: [highmem, highcpu, hightime]
1
The interface to use for this component.
2
The name of the component. In this case, this should always be set to process_dataset.
3
(optional) Arguments that can be passed to the component.
4
Resources are files that provide the logic of the component.
5
We’ll create the Python script in the next step.
6
(Optional) helper functions you can use in your script.
7
A Docker engine to make the component reproducible.
8
A Nextflow runner to allow turning this component into a Nextflow module (and also standalone pipeline).
src/data_processors/process_dataset/config.vsh.yaml
1__merge__: ../../api/comp_data_processor.yaml
2name: process_dataset
3# arguments:
#   - name: "--method"
#     type: "string"
#     description: "The process method to assign train/test."
#     choices: ["batch", "random"]
#     default: "batch"
4resources:
5  - type: python_script
    path: script.py
6  - path: /common/helper_functions/subset_h5ad_by_format.py

engines: 
7  - type: docker
    image: openproblems/base_python:1.0.0

runners: 
  - type: executable
8  - type: nextflow
    directives:
      label: [highmem, highcpu, hightime]
1
The interface to use for this component.
2
The name of the component. In this case, this should always be set to process_dataset.
3
(optional) Arguments that can be passed to the component.
4
Resources are files that provide the logic of the component.
5
We’ll create the Python script in the next step.
6
(Optional) Helper functions you can use in your script.
7
A Docker engine to make the component reproducible.
8
A Nextflow runner to allow turning this component into a Nextflow module (and also standalone pipeline).

Step 2: Create the script

Next, create the script which will split the common dataset into a train and solution dataset.

src/data_processors/process_dataset/script.py
import anndata as ad

## VIASH START
par = {
    "input": "resources_test/common/cxg_mouse_pancreas_atlas/dataset.h5ad",
    "output_dataset": "dataset.h5ad",
    "output_solution": "solution.h5ad",
}
## VIASH END

print(">> Load common dataset", flush=True)
adata = ad.read_h5ad(par["input"])

# <- implement logic for splitting the common dataset
#    into a dataset and a solution object here.

print(">> Create dataset for methods", flush=True)
output_dataset = ad.AnnData(
  # <- add the data you wish to store in the output_dataset object here
)

print(">> Create solution object for metrics", flush=True)
output_solution = ad.AnnData(
  # <- add the data you wish to store in the output_solution object here
)

print(">> Write to disk", flush=True)
output_dataset.write_h5ad(par["output_dataset"])
output_solution.write_h5ad(par["output_solution"])
src/data_processors/process_dataset/script.R
library(anndata)

## VIASH START
par <- list(
  input = "resources_test/common/cxg_mouse_pancreas_atlas/dataset.h5ad",
  output_dataset = "dataset.h5ad",
  output_solution = "solution.h5ad",
)
## VIASH END

cat(">> Load common dataset\n")
adata <- anndata::read_h5ad(par[["input"]])

# <- implement logic for splitting the common dataset
#    into a dataset and a solution object here.

cat(">> Create dataset for methods\n")
output_dataset <- anndata::AnnData(
  # <- add the data you wish to store in the output_dataset object here
)

cat(">> Create solution object for metrics\n")
output_solution <- anndata::AnnData(
  # <- add the data you wish to store in the output_solution object here
)

print(">> Write to disk\n")
output_dataset$write_h5ad(par[["output_dataset"]])
output_solution$write_h5ad(par[["output_solution"]])
Tip

You can use helper functions subset_h5ad_by_format defined in common/helper_functions/subset_h5ad_by_format.py to easily subset the AnnData to having only the slots specified by the corresponding API files. Examples of tasks where this is used: Task template Dimensionality reduction.

Step 3: Create test resources

Next, we’ll create part of the test resources by creating a test resource script. Create a Bash script which runs the process_dataset component.

scripts/create_datasets/test_resources.sh
#!/bin/bash

# get the root of the directory
REPO_ROOT=$(git rev-parse --show-toplevel)

# ensure that the command below is run from the root of the repository
cd "$REPO_ROOT"

1# remove this when you have implemented the script
echo "TODO: replace the commands in this script with the sequence of components that you need to run to generate test_resources."
echo "  Inside this script, you will need to place commands to generate example files for each of the 'src/api/file_*.yaml' files."
exit 1

set -e

RAW_DATA=resources_test/common
DATASET_DIR=resources_test/task_template

mkdir -p $DATASET_DIR

# process dataset
echo Running process_dataset
nextflow run . \
  -main-script target/nextflow/workflows/process_datasets/main.nf \
  -profile docker \
  --publish_dir "$DATASET_DIR" \
  --id "pancreas" \
  --input "$RAW_DATA/cxg_mouse_pancreas_atlas/dataset.h5ad" \
  --output_train '$id/train.h5ad' \
  --output_test '$id/test.h5ad' \
  --output_solution '$id/solution.h5ad' \
  --output_state '$id/state.yaml'

# run one method
viash run src/methods/knn/config.vsh.yaml -- \
    --input_train $DATASET_DIR/cxg_mouse_pancreas_atlas/train.h5ad \
    --input_test $DATASET_DIR/cxg_mouse_pancreas_atlas/test.h5ad \
    --output $DATASET_DIR/cxg_mouse_pancreas_atlas/prediction.h5ad

# run one metric
viash run src/metrics/accuracy/config.vsh.yaml -- \
    --input_prediction $DATASET_DIR/cxg_mouse_pancreas_atlas/prediction.h5ad \
    --input_solution $DATASET_DIR/cxg_mouse_pancreas_atlas/solution.h5ad \
    --output $DATASET_DIR/cxg_mouse_pancreas_atlas/score.h5ad

2# only run this if you have access to the openproblems-data bucket
# aws s3 sync --profile op \
#   "$DATASET_DIR" s3://openproblems-data/resources_test/task_template \
#   --delete --dryrun
1
Remove this when you have implemented the script.
2
Only run this if you have access to the openproblems-data bucket.
Note

You’ll have to update the arguments to the output arguments set in the API file.

Now run the script to generate the first couple of test resources:

scripts/create_datasets/test_resources.sh