Skip to content

Commit

Permalink
add multicores processing support for active learning sampling methods
Browse files Browse the repository at this point in the history
  • Loading branch information
M3nin0 committed May 16, 2024
1 parent f6749d2 commit a4d7211
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 51 deletions.
18 changes: 13 additions & 5 deletions R/api_raster.R
Original file line number Diff line number Diff line change
Expand Up @@ -168,32 +168,40 @@
#' locations are guaranteed to be separated by a certain number of pixels.
#'
#' @param r_obj A raster object.
#' @param block Individual block that will be processed.
#' @param band A numeric band index used to read bricks.
#' @param n Number of values to extract.
#' @param sampling_window Window size to collect a point (in pixels).
#'
#' @return A point `tibble` object.
#'
.raster_get_top_values <- function(r_obj,
block,
band,
n,
sampling_window) {
# Pre-conditions have been checked in calling functions
# Get top values
# filter by median to avoid borders
# Process window
values <- .raster_get_values(r_obj)
values <- .raster_get_values(
r_obj,
row = block[["row"]],
col = block[["col"]],
nrows = block[["nrows"]],
ncols = block[["ncols"]]
)
values <- C_kernel_median(
x = values,
ncols = .raster_ncols(r_obj),
nrows = .raster_nrows(r_obj),
nrows = block[["nrows"]],
ncols = block[["ncols"]],
band = 0,
window_size = sampling_window
)
samples_tb <- C_max_sampling(
x = values,
nrows = .raster_nrows(r_obj),
ncols = .raster_ncols(r_obj),
nrows = block[["nrows"]],
ncols = block[["ncols"]],
window_size = sampling_window
)
samples_tb <- dplyr::slice_max(
Expand Down
170 changes: 129 additions & 41 deletions R/sits_active_learning.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
#' @param min_uncert Minimum uncertainty value to select a sample.
#' @param sampling_window Window size for collecting points (in pixels).
#' The minimum window size is 10.
#' @param multicores Number of workers for parallel processing
#' (integer, min = 1, max = 2048).
#' @param memsize Maximum overall memory (in GB) to run the
#' function.
#'
#' @return
#' A tibble with longitude and latitude in WGS84 with locations
Expand Down Expand Up @@ -75,23 +79,64 @@
sits_uncertainty_sampling <- function(uncert_cube,
n = 100L,
min_uncert = 0.4,
sampling_window = 10L) {
sampling_window = 10L,
multicores = 1L,
memsize = 1L) {
.check_set_caller("sits_uncertainty_sampling")

# Pre-conditions
.check_is_uncert_cube(uncert_cube)
.check_int_parameter(n, min = 1, max = 10000)
.check_num_parameter(min_uncert, min = 0.2, max = 1.0)
.check_int_parameter(sampling_window, min = 10L)

.check_int_parameter(multicores, min = 1, max = 2048)
.check_int_parameter(memsize, min = 1, max = 16384)
# Get block size
block <- .raster_file_blocksize(.raster_open_rast(.tile_path(uncert_cube)))
# Overlapping pixels
overlap <- ceiling(sampling_window / 2) - 1
# Check minimum memory needed to process one block
job_memsize <- .jobs_memsize(
job_size = .block_size(block = block, overlap = overlap),
npaths = sampling_window,
nbytes = 8,
proc_bloat = .conf("processing_bloat_cpu")
)
# Update multicores parameter
multicores <- .jobs_max_multicores(
job_memsize = job_memsize,
memsize = memsize,
multicores = multicores
)
# Update block parameter
block <- .jobs_optimal_block(
job_memsize = job_memsize,
block = block,
image_size = .tile_size(.tile(uncert_cube)),
memsize = memsize,
multicores = multicores
)
# Prepare parallel processing
.parallel_start(workers = multicores)
on.exit(.parallel_stop(), add = TRUE)
# Slide on cube tiles
samples_tb <- slider::slide_dfr(uncert_cube, function(tile) {
path <- .tile_path(tile)
# Create chunks as jobs
chunks <- .tile_chunks_create(
tile = tile,
overlap = overlap,
block = block
)
# Tile path
tile_path <- .tile_path(tile)
# Get a list of values of high uncertainty
top_values <- .raster_open_rast(path) |>
# Process jobs in parallel
top_values <- .jobs_map_parallel_dfr(chunks, function(chunk) {
# Read and preprocess values
.raster_open_rast(tile_path) |>
.raster_get_top_values(
band = 1,
n = n,
block = .block(chunk),
band = 1,
n = n,
sampling_window = sampling_window
) |>
dplyr::mutate(
Expand All @@ -105,6 +150,7 @@ sits_uncertainty_sampling <- function(uncert_cube,
c("longitude", "latitude", "value")
)) |>
tibble::as_tibble()
})
# All the cube's uncertainty images have the same start & end dates.
top_values[["start_date"]] <- .tile_start_date(tile)
top_values[["end_date"]] <- .tile_end_date(tile)
Expand Down Expand Up @@ -174,6 +220,10 @@ sits_uncertainty_sampling <- function(uncert_cube,
#' @param min_margin Minimum margin of confidence to select a sample
#' @param sampling_window Window size for collecting points (in pixels).
#' The minimum window size is 10.
#' @param multicores Number of workers for parallel processing
#' (integer, min = 1, max = 2048).
#' @param memsize Maximum overall memory (in GB) to run the
#' function.
#'
#' @return
#' A tibble with longitude and latitude in WGS84 with locations
Expand Down Expand Up @@ -204,54 +254,92 @@ sits_uncertainty_sampling <- function(uncert_cube,
sits_confidence_sampling <- function(probs_cube,
n = 20L,
min_margin = 0.90,
sampling_window = 10L) {
sampling_window = 10L,
multicores = 1L,
memsize = 1L) {
.check_set_caller("sits_confidence_sampling")

# Pre-conditions
.check_is_probs_cube(probs_cube)
.check_int_parameter(n, min = 20)
.check_num_parameter(min_margin, min = 0.01, max = 1.0)
.check_int_parameter(sampling_window, min = 10)

.check_int_parameter(multicores, min = 1, max = 2048)
.check_int_parameter(memsize, min = 1, max = 16384)
# Get block size
block <- .raster_file_blocksize(.raster_open_rast(.tile_path(probs_cube)))
# Overlapping pixels
overlap <- ceiling(sampling_window / 2) - 1
# Check minimum memory needed to process one block
job_memsize <- .jobs_memsize(
job_size = .block_size(block = block, overlap = overlap),
npaths = sampling_window,
nbytes = 8,
proc_bloat = .conf("processing_bloat_cpu")
)
# Update multicores parameter
multicores <- .jobs_max_multicores(
job_memsize = job_memsize,
memsize = memsize,
multicores = multicores
)
# Update block parameter
block <- .jobs_optimal_block(
job_memsize = job_memsize,
block = block,
image_size = .tile_size(.tile(probs_cube)),
memsize = memsize,
multicores = multicores
)
# Prepare parallel processing
.parallel_start(workers = multicores)
on.exit(.parallel_stop(), add = TRUE)
# get labels
labels <- sits_labels(probs_cube)

# Slide on cube tiles
samples_tb <- slider::slide_dfr(probs_cube, function(tile) {
# Open raster
r_obj <- .raster_open_rast(.tile_path(tile))

# Get samples for each label
purrr::map2_dfr(labels, seq_along(labels), function(lab, i) {
# Get a list of values of high confidence & apply threshold
top_values <- r_obj |>
.raster_get_top_values(
band = i,
n = n,
sampling_window = sampling_window
) |>
dplyr::mutate(
value = .data[["value"]] *
.conf("probs_cube_scale_factor")
) |>
dplyr::filter(
.data[["value"]] >= min_margin
) |>
dplyr::select(dplyr::matches(
c("longitude", "latitude", "value")
)) |>
tibble::as_tibble()
# Create chunks as jobs
chunks <- .tile_chunks_create(
tile = tile,
overlap = overlap,
block = block
)
# Tile path
tile_path <- .tile_path(tile)
# Get a list of values of high uncertainty
# Process jobs in parallel
.jobs_map_parallel_dfr(chunks, function(chunk) {
# Get samples for each label
purrr::map2_dfr(labels, seq_along(labels), function(lab, i) {
# Get a list of values of high confidence & apply threshold
top_values <- .raster_open_rast(tile_path) |>
.raster_get_top_values(
block = .block(chunk),
band = i,
n = n,
sampling_window = sampling_window
) |>
dplyr::mutate(
value = .data[["value"]] *
.conf("probs_cube_scale_factor")
) |>
dplyr::filter(
.data[["value"]] >= min_margin
) |>
dplyr::select(dplyr::matches(
c("longitude", "latitude", "value")
)) |>
tibble::as_tibble()

# All the cube's uncertainty images have the same start &
# end dates.
top_values[["start_date"]] <- .tile_start_date(tile)
top_values[["end_date"]] <- .tile_end_date(tile)
top_values[["label"]] <- lab
# All the cube's uncertainty images have the same start &
# end dates.
top_values[["start_date"]] <- .tile_start_date(tile)
top_values[["end_date"]] <- .tile_end_date(tile)
top_values[["label"]] <- lab

return(top_values)
return(top_values)
})
})
})

# Slice result samples
result_tb <- samples_tb |>
dplyr::group_by(.data[["label"]]) |>
Expand Down
10 changes: 9 additions & 1 deletion man/sits_confidence_sampling.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion man/sits_uncertainty_sampling.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions tests/testthat/test-active_learning.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ test_that("Suggested samples have low confidence, high entropy", {
uncert_cube,
min_uncert = 0.3,
n = 100,
sampling_window = 10
sampling_window = 10,
multicores = 2,
memsize = 2
))

expect_true(nrow(samples_df) <= 100)
Expand Down Expand Up @@ -80,15 +82,19 @@ test_that("Increased samples have high confidence, low entropy", {
probs_cube = probs_cube,
n = 20,
min_margin = 0.5,
sampling_window = 10
sampling_window = 10,
multicores = 2,
memsize = 2
)
)
expect_warning(
sits_confidence_sampling(
probs_cube = probs_cube,
n = 60,
min_margin = 0.5,
sampling_window = 10
sampling_window = 10,
multicores = 2,
memsize = 2
)
)
labels <- sits_labels(probs_cube)
Expand Down

0 comments on commit a4d7211

Please sign in to comment.