-
Notifications
You must be signed in to change notification settings - Fork 76
/
api_jobs.R
148 lines (148 loc) · 5.57 KB
/
api_jobs.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#' @title Estimate the memory need to process a job
#' @noRd
#' @param job_size Size of the each block to be processed
#' @param npaths Number of inputs (n_bands * n_times)
#' @param nbytes Number of bytes per image
#' @param proc_bloat Estimated processing bloat
#' @returns Estimated job size in MB
.jobs_memsize <- function(job_size, npaths, nbytes, proc_bloat) {
# Memory needed per job
job_size * npaths * nbytes * proc_bloat * 1e-09
}
#' @title Estimate the number of multicores to be used
#' @noRd
#' @param job_memsize Total memory required for job
#' @param memsize Memory available (in MB)
#' @param multicores Number of cores available for processing
#' @returns Number of cores required for processing
.jobs_max_multicores <- function(job_memsize, memsize, multicores) {
# set caller to show in errors
.check_set_caller(".jobs_max_multicores")
# Check if memsize is above minimum needed to process one block
.check_that(job_memsize < memsize)
# Max parallel blocks supported by memsize
max_blocks <- floor(memsize / job_memsize)
# Max multicores
min(multicores, max_blocks)
}
#' @title Update block parameter
#' @noRd
#' @param job_memsize Total memory required for job
#' @param block Initial estimate of block size
#' @param image_size Size of image to be processed
#' @param memsize Memory available (in MB)
#' @param multicores Number of cores available for processing
#' @returns Optimal estimate of block size
.jobs_optimal_block <- function(job_memsize, block, image_size, memsize,
multicores) {
# Memory per core
mpc <- memsize / multicores
# Blocks per core
bpc <- max(1, floor(mpc / job_memsize))
# Image horizontal blocks
hb <- ceiling(image_size[["ncols"]] / block[["ncols"]])
if (bpc < hb * 2) {
# 1st optimization - line level
# Number of segments to process whole line
h_nsegs <- ceiling(hb / bpc)
# Number of horizontal blocks
return(c(
ncols = ceiling(hb / h_nsegs) * block[["ncols"]],
nrows = block[["nrows"]]
))
}
# 2nd optimization - area level
# Lines per core
lpc <- floor(bpc / hb)
# Image vertical blocks
vb <- ceiling(image_size[["nrows"]] / block[["nrows"]])
# Number of vertical segments
v_nsegs <- ceiling(vb / lpc)
# Number of vertical blocks
return(c(
ncols = min(hb * block[["ncols"]], image_size[["ncols"]]),
nrows = min(
ceiling(vb / v_nsegs) * block[["nrows"]],
image_size[["nrows"]]
)
))
}
#' @title Return the number of multicores used
#' @noRd
#' @returns Number of multicores
.jobs_multicores <- function() {
length(sits_env[["cluster"]])
}
#' @title Return list of jobs
#' @noRd
#' @param jobs Jobs to be processed
#' @returns List of jobs
.jobs_split <- function(jobs) {
list(jobs)
}
#' @title Run a sequential function for all jobs
#' @noRd
#' @param jobs Jobs to be processed
#' @param fn Function to be run sequentially
#' @returns List with function results
.jobs_map_sequential <- function(jobs, fn, ...) {
slider::slide(jobs, fn, ...)
}
#' @title Run a sequential function for all jobs and return vector
#' @noRd
#' @param jobs Jobs to be processed
#' @param fn Function to be run sequentially
#' @returns Character vector with function results
.jobs_map_sequential_chr <- function(jobs, fn, ...) {
slider::slide_chr(jobs, fn, ...)
}
#' @title Run a sequential function for all jobs and return data.frame
#' @noRd
#' @param jobs Jobs to be processed
#' @param fn Function to be run sequentially
#' @returns Data.frame with function results
.jobs_map_sequential_dfr <- function(jobs, fn, ...) {
slider::slide_dfr(jobs, fn, ...)
}
#' @title Run a parallel function for all jobs
#' @noRd
#' @param jobs Jobs to be processed
#' @param fn Function to be run in parallel
#' @param ... Additional parameters for function
#' @param sync_fn Function to be synchronize jobs
#' @param progress Show progress bar?
#' @returns List with function results
.jobs_map_parallel <- function(jobs, fn, ..., sync_fn = NULL,
progress = FALSE) {
# Do split by rounds only if sync_fn is not NULL
rounds <- .jobs_split(jobs)
unlist(purrr::map(rounds, function(round) {
if (!is.null(sync_fn)) {
sync_fn(round)
}
round <- slider::slide(round, identity)
.parallel_map(round, fn, ..., progress = progress)
}), recursive = FALSE)
}
#' @title Run a parallel function for all jobs and return vector
#' @noRd
#' @param jobs Jobs to be processed
#' @param fn Function to be run in parallel
#' @param ... Additional parameters for function
#' @param progress Show progress bar?
#' @returns Character vector with function results
.jobs_map_parallel_chr <- function(jobs, fn, ..., progress = FALSE) {
values_lst <- .jobs_map_parallel(jobs, fn, ..., progress = progress)
vapply(values_lst, c, NA_character_)
}
#' @title Run a parallel function for all jobs and return data.frame
#' @noRd
#' @param jobs Jobs to be processed
#' @param fn Function to be run in parallel
#' @param ... Additional parameters for function
#' @param progress Show progress bar?
#' @returns Data.frame with function results
.jobs_map_parallel_dfr <- function(jobs, fn, ..., progress = FALSE) {
values_lst <- .jobs_map_parallel(jobs, fn, ..., progress = progress)
dplyr::bind_rows(values_lst)
}