Skip to content

Commit

Permalink
exp/lighthorizon/build/index-batch: carry over map/reduce updates to …
Browse files Browse the repository at this point in the history
…latest docker layout on feature branch (#4543)
  • Loading branch information
sreuland committed Aug 17, 2022
1 parent adadbcc commit 4838973
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 8 deletions.
19 changes: 16 additions & 3 deletions .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,27 @@ jobs:
# Any range should do for basic testing, this range was chosen pretty early in history so that it only takes a few mins to run
run: |
chmod 755 ./exp/lighthorizon/build/build.sh
./exp/lighthorizon/build/build.sh ledgerexporter stellar latest false
docker run -e ARCHIVE_TARGET=file:https:///ledgerexport-test\
mkdir $PWD/ledgerexport
mkdir $PWD/index
./exp/lighthorizon/build/build.sh all stellar latest false
docker run -e ARCHIVE_TARGET=file:https:///ledgerexport\
-e START=5\
-e END=50\
-e END=150\
-e NETWORK_PASSPHRASE="Test SDF Network ; September 2015"\
-e CAPTIVE_CORE_CONFIG="/captive-core-testnet.cfg"\
-e HISTORY_ARCHIVE_URLS="https://history.stellar.org/prd/core-testnet/core_testnet_001,https://history.stellar.org/prd/core-testnet/core_testnet_002"\
-v $PWD/ledgerexport:/ledgerexport\
stellar/lighthorizon-ledgerexporter
# run map job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e BATCH_SIZE=64 -e FIRST_CHECKPOINT=64 \
-e WORKER_COUNT=1 -e RUN_MODE=map -v $PWD/ledgerexport:/ledgermeta -e TXMETA_SOURCE=file:https:///ledgermeta -v $PWD/index:/index -e INDEX_TARGET=file:https:///index stellar/lighthorizon-index-batch
# run reduce job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e MAP_JOB_COUNT=1 -e REDUCE_JOB_COUNT=1 \
-e WORKER_COUNT=1 -e RUN_MODE=reduce -v $PWD/index:/index -e INDEX_SOURCE_ROOT=file:https:///index -e INDEX_TARGET=file:https:///index stellar/lighthorizon-index-batch
# Push images
- if: github.ref == 'refs/heads/master' || github.ref == 'refs/heads/lighthorizon'
name: Login to DockerHub
Expand Down
7 changes: 7 additions & 0 deletions exp/lighthorizon/build/index-batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# `stellar/horizon-indexer`

This docker image contains the ledger/checkpoint indexing executables. It allows running multiple instances of `map`/`reduce` on a single machine or running it in [AWS Batch](https://aws.amazon.com/batch/).

## Env variables

See the [package documentation](../../index/cmd/batch/doc.go) for more details
1 change: 0 additions & 1 deletion exp/lighthorizon/build/index-batch/start
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ set -e

export TRACY_NO_INVARIANT_CHECK=1
NETWORK_PASSPHRASE="${NETWORK_PASSPHRASE:=Public Global Stellar Network ; September 2015}"

if [ "$RUN_MODE" == "reduce" ]; then
echo "Running Reduce, REDUCE JOBS: $REDUCE_JOB_COUNT MAP JOBS: $MAP_JOB_COUNT TARGET INDEX: $INDEX_TARGET"
/reduce
Expand Down
43 changes: 43 additions & 0 deletions exp/lighthorizon/build/k8s/lighthorizon_batch_map_job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: batch/v1
kind: Job
metadata:
name: 'batch-map-job'
spec:
completions: 52
parallelism: 10
completionMode: Indexed
template:
spec:
restartPolicy: Never
containers:
- name: 'worker'
image: 'stellar/lighthorizon-index-batch'
imagePullPolicy: Always
envFrom:
- secretRef:
name: <reference to secret name here if needed for source/target>
env:
- name: RUN_MODE
value: "map"
- name: BATCH_SIZE
value: "10048"
- name: FIRST_CHECKPOINT
value: "41426080"
- name: WORKER_COUNT
value: "8"
- name: TXMETA_SOURCE
value: "<url of txmeta source>"
- name: JOB_INDEX_ENV
value: "JOB_COMPLETION_INDEX"
- name: NETWORK_PASSPHRASE
value: "pubnet"
- name: INDEX_TARGET
value: "url of target index"
resources:
limits:
cpu: 4
memory: 5Gi
requests:
cpu: 500m
memory: 500Mi

42 changes: 42 additions & 0 deletions exp/lighthorizon/build/k8s/lighthorizon_batch_reduce_job copy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
apiVersion: batch/v1
kind: Job
metadata:
name: 'batch-reduce-job'
spec:
completions: 52
parallelism: 10
completionMode: Indexed
template:
spec:
restartPolicy: Never
containers:
- name: 'worker'
image: 'stellar/lighthorizon-index-batch'
imagePullPolicy: Always
envFrom:
- secretRef:
name: <reference to secret name here if needed for source/target>
env:
- name: RUN_MODE
value: "reduce"
- name: MAP_JOB_COUNT
value: 52
- name: REDUCE_JOB_COUNT
value: 52
- name: WORKER_COUNT
value: 8
- name: INDEX_SOURCE_ROOT
value: "<url of index location>"
- name: JOB_INDEX_ENV
value: JOB_COMPLETION_INDEX
- name: INDEX_TARGET
value: "<url of index location>"
resources:
limits:
cpu: 4
memory: 5Gi
requests:
cpu: 500m
memory: 500Mi


8 changes: 8 additions & 0 deletions exp/lighthorizon/index/backend/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@ type FileBackend struct {
parallel uint32
}

// NewFileBackend connects to indices stored at `dir`, creating the directory if one doesn't
// exist, and uses `parallel` to control how many workers to use when flushing to disk.
func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) {
if parallel <= 0 {
parallel = 1
}

err := os.MkdirAll(dir, fs.ModeDir|0755)
if err != nil {
log.Errorf("Unable to mkdir %s, %v", dir, err)
return nil, err
}

return &FileBackend{
dir: dir,
parallel: parallel,
Expand Down
6 changes: 5 additions & 1 deletion exp/lighthorizon/index/cmd/batch/map/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type BatchConfig struct {

const (
batchSizeEnv = "BATCH_SIZE"
jobIndexEnv = "AWS_BATCH_JOB_ARRAY_INDEX"
jobIndexEnvName = "JOB_INDEX_ENV"
firstCheckpointEnv = "FIRST_CHECKPOINT"
txmetaSourceUrlEnv = "TXMETA_SOURCE"
indexTargetUrlEnv = "INDEX_TARGET"
Expand All @@ -39,6 +39,10 @@ func NewBatchConfig() (*BatchConfig, error) {
return nil, errors.New("required parameter: " + indexTargetUrlEnv)
}

jobIndexEnv := os.Getenv(jobIndexEnvName)
if jobIndexEnv == "" {
return nil, errors.New("env variable can't be empty " + jobIndexEnvName)
}
jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv)
Expand Down
7 changes: 6 additions & 1 deletion exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ func ReduceConfigFromEnvironment() (*ReduceConfig, error) {
mapJobsEnv = "MAP_JOB_COUNT"
reduceJobsEnv = "REDUCE_JOB_COUNT"
workerCountEnv = "WORKER_COUNT"
jobIndexEnv = "AWS_BATCH_JOB_ARRAY_INDEX"
jobIndexEnvName = "JOB_INDEX_ENV"
indexRootSourceEnv = "INDEX_SOURCE_ROOT"
indexTargetEnv = "INDEX_TARGET"
)

jobIndexEnv := os.Getenv(jobIndexEnvName)
if jobIndexEnv == "" {
return nil, errors.New("env variable can't be empty " + jobIndexEnvName)
}

jobIndex, err := strconv.ParseUint(strings.TrimSpace(os.Getenv(jobIndexEnv)), 10, 32)
if err != nil {
return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv)
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/cmd/map.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ for (( i=0; i < $BATCH_COUNT; i++ ))
do
echo -n "Creating map job $i... "

NETWORK_PASSPHRASE='testnet' MODULES='accounts_unbacked,transactions' \
NETWORK_PASSPHRASE='testnet' JOB_INDEX_ENV='AWS_BATCH_JOB_ARRAY_INDEX' MODULES='accounts_unbacked,transactions' \
AWS_BATCH_JOB_ARRAY_INDEX=$i BATCH_SIZE=$BATCH_SIZE FIRST_CHECKPOINT=$FIRST \
TXMETA_SOURCE=file:https://$1 INDEX_TARGET=file:https://$2 WORKER_COUNT=1 \
./map &
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/cmd/reduce.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ for (( i=0; i < $REDUCE_JOB_COUNT; i++ ))
do
echo -n "Creating reduce job $i... "

AWS_BATCH_JOB_ARRAY_INDEX=$i MAP_JOB_COUNT=$MAP_JOB_COUNT \
AWS_BATCH_JOB_ARRAY_INDEX=$i JOB_INDEX_ENV="AWS_BATCH_JOB_ARRAY_INDEX" MAP_JOB_COUNT=$MAP_JOB_COUNT \
REDUCE_JOB_COUNT=$REDUCE_JOB_COUNT WORKER_COUNT=4 \
INDEX_SOURCE_ROOT=file:https://$1 INDEX_TARGET=file:https://$2 \
timeout -k 30s 10s ./reduce &
Expand Down

0 comments on commit 4838973

Please sign in to comment.