#!/bin/bash
# vim: sw=4 et

usage() {
    prog=$(basename $0)
    cat 1>&2 <<EOF
usage: $prog startup WORKDIR [ENVIRONMENT]
       $prog shutdown

positional arguments:
  WORKDIR       Working directory to use.
  ENVIRONMENT   Script to source to obtain the right environment.
                Will be automatically looked for in $WORKDIR.

environment variables:
  SM_MASTER_MEMORY      Memory to dedicate to the master. Will be
                        subtracted from the detected machine memory when
                        calculating the memory allocation for workers.

                        Can be set by the user, and is specified in MB.
                        Defaults to 4096.

  SM_WORKDIR            The WORKDIR exported and accessible to the
                        ENVIRONMENT script.

  SM_WORKER_CORES       Cores to allot to a worker.

  SM_WORKER_MEMORY      Memory to allot to a worker.

  SM_WORKER_COUNT       Limit number of workers.

  SM_EXECUTE            Command to execute once the cluster has started.

allocate resources with SLURM via:

    salloc -Aproj16 -pinteractive --exclusive --time 10:00:00 -N4

or using COBALT with:

    qsub -A PlasticNeocortex -t 60 -n 8 -I
EOF
    exit 1
}

_detect_allocation() {
    if [ ! -z "$SLURM_JOBID" ]; then
        echo slurm
    elif [ ! -z "$COBALT_NODEFILE" ]; then
        echo cobalt
    fi
}

_detect_memory() {
    awk '/MemAvailable/ {print $2/1024}' /proc/meminfo
}

_cobalt_start_master() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -a "$envscript" ]] && . $envscript

    export SM_MASTER_MEMORY=${SM_MASTER_MEMORY:-4096}
    export SPARK_DAEMON_MEMORY=${SM_MASTER_MEMORY}m
    export SPARK_MASTER_IP=$(hostname)

    echo "spark://$SPARK_MASTER_IP:${SPARK_MASTER_PORT:-7077}" > "$workdir/spark_master"

    $SPARK_HOME/sbin/start-master.sh
}

_cobalt_start_worker() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -f "$envscript" ]] && . $envscript

    MASTER_NODE=$(cat $workdir/spark_master)

    echo "> Connecting to ${MASTER_NODE}"

    $SPARK_HOME/sbin/start-slave.sh $MASTER_NODE || exit $?
}

_cobalt_start_cluster() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    script=$(readlink -f $0)
    workdir=$(readlink -f $workdir)
    envscript=$(readlink -f $envscript)

    master=$(grep -oe '\(\w*\.\)\{1,\}\w*'<$workdir/spark_master)

    if [ -z "$(ssh $master jps -lm 2>/dev/null|grep org.apache.spark.deploy.master.Master)" ]; then
        master=
    fi

    if [ ! -d "$workdir" ]; then
        echo ">> Creating working directory '$workdir'"
        mkdir $workdir
    fi

    if [ -z "$master" ]; then
        echo ">> Copying myself to working directory '$workdir'"
        cp $script $workdir

        master=$(head -n 1 "$COBALT_NODEFILE")
        ssh $master "$workdir/$(basename $script)" _cobalt_start_master "$workdir" "$envscript"
    else
        echo "<< Connecting to master $master"
    fi

    while read host; do
        ssh $host "$workdir/$(basename $script)" _cobalt_start_master "$workdir "$envscript""
    done < "$COBALT_NODEFILE"
}

_cobalt_stop_master() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -f "$envscript" ]] && . $envscript

    result=$($SPARK_HOME/sbin/stop-master.sh)
    echo $result
    if [[ $result == *"to stop"* ]]; then
        proc=$(ps -A | grep java)
        [[ ! -z "$proc" ]] && pkill -9 java
    fi
}

_cobalt_stop_worker() {
    envscript=$1
    workdir=$2
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -f "$envscript" ]] && . $envscript

    result=$($SPARK_HOME/sbin/stop-slave.sh)
    echo $result
    if [[ $result == *"to stop"* ]]; then
        proc=$(ps -A | grep java)
        [[ ! -z "$proc" ]] && pkill -9 java
    fi
}

_cobalt_stop_cluster() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    script=$(readlink -f $0)
    workdir=$(readlink -f $workdir)
    envscript=$(readlink -f $envscript)

    master=$(head -n 1 "$COBALT_NODEFILE")
    ssh $master "$script" _cobalt_stop_master "$workdir" "$envscript"
    while read host; do
        ssh $host "$script" _cobalt_stop_worker "$workdir" "$envscript"
    done < "$COBALT_NODEFILE"
}

_slurm_sleep() {
    time=$(
        scontrol show job $SLURM_JOBID| \
            ruby -rdate -ne "
                if (m = \$_.match(%r{EndTime=(.*)Deadline}))
                    diff = DateTime.parse(m[1] + ' ' + DateTime.now.zone) - DateTime.now()
                    puts (diff * (24*60*60)).to_i
                end"
    )
    echo "> Sleeping $time seconds"
    sleep $time
}

_slurm_start_cluster() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    script=$(readlink -f $0)
    workdir=$(readlink -f $workdir)
    envscript=$(readlink -f $envscript)

    nvme=$(srun ls /|grep nvme)

    if [[ -z "$SPARK_LOCAL_DIRS" && -n "$nvme" ]]; then
        export SPARK_LOCAL_DIRS="/nvme/$(whoami)/$SLURM_JOBID"
        export CLEANUP=Y
        echo ">> Creating SPARK_LOCAL_DIRS='/nvme/$(whoami)/$SLURM_JOBID'"
        srun mkdir -p "$SPARK_LOCAL_DIRS"
        trap "echo '>> Cleaning up $SPARK_LOCAL_DIRS';srun rm -rf \"$SPARK_LOCAL_DIRS\"" EXIT KILL
    fi

    master=none
    [[ -f "$workdir/spark_master" ]] && master=$(grep -oe '\(\w*\.\)\{1,\}\w*'<$workdir/spark_master)

    if [ -z "$(ssh $master jps -lm 2>/dev/null|grep org.apache.spark.deploy.master.Master)" ]; then
        master=none
        rm -f "$workdir/spark_master"
    fi

    if [ ! -d "$workdir" ]; then
        echo ">> Creating working directory '$workdir'"
        mkdir $workdir
    fi

    if [ "$master" = "none" ]; then
        echo ">> Copying myself to working directory '$workdir'"
        cp $script $workdir
    else
        echo "<< Connecting to master $master"
    fi

    if [ -n "$SM_EXECUTE" ]; then
        rm -f "$workdir/done"
    fi

    srun $workdir/$(basename $script) _slurm_start_processes "$workdir" "$master" "$envscript"
    if [ -n "$SM_EXECUTE" ]; then
        while [[ ! -f "$workdir/done" ]]; do sleep 10s; done
        code=$(cat "$workdir/done")
        echo ">> Exiting with $code"
        exit $code
    fi
}

_slurm_stop_cluster() {
    echo "!!! Just kill the allocation"
}

_slurm_start_processes() {
    workdir=$1
    master=$2
    envscript=$3
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -f "$envscript" ]] && . $envscript

    export SM_MASTER_MEMORY=${SM_MASTER_MEMORY:-4096}

    if [[ "$master" = "none" && $SLURM_PROCID -eq 0 ]]; then
        export SPARK_DAEMON_MEMORY=${SM_MASTER_MEMORY}m
        export SPARK_MASTER_IP=$(hostname)

        output=$($SPARK_HOME/sbin/start-master.sh)
        echo "$output"
        echo "spark://$SPARK_MASTER_IP:${SPARK_MASTER_PORT:-7077}" > "$workdir/spark_master"
        [[ "$output" == *failed* ]] && exit 1
    fi

    if [[ "$master" = "none" ]]; then
        master=
    fi

    if [[ -n "$SM_WORKER_COUNT" && $SLURM_PROCID -ge $SM_WORKER_COUNT ]]; then
        exit 0
    fi

    while [[ ! -f "$workdir/spark_master" ]]; do sleep 0.5s; done

    mem=${SM_WORKER_MEMORY:-$((${SLURM_MEM_PER_NODE:-$(($SLURM_CPUS_ON_NODE * $SLURM_MEM_PER_CPU))} - $SM_MASTER_MEMORY))}
    detected=$(_detect_memory)

    if [[ $(( $mem > $detected )) ]]; then
        echo "> fixing memory to ${detected} MiB"
        mem=$detected
    fi

    export SPARK_WORKER_CORES=${SM_WORKER_CORES:-${SLURM_CPUS_PER_TASK:-$SLURM_CPUS_ON_NODE}}
    export SPARK_WORKER_MEMORY=${mem}m

    MASTER_NODE=${master:-spark://$(scontrol show hostname $SLURM_NODELIST | head -n 1):7077}

    echo "> Running workers with ${SPARK_WORKER_CORES} cores and ${SPARK_WORKER_MEMORY} memory"
    echo "> Connecting to ${MASTER_NODE}"

    output=$($SPARK_HOME/sbin/start-slave.sh $MASTER_NODE)
    echo "$output"
    [[ "$output" == *failed* ]] && exit 1

    if [[ -n "$SM_EXECUTE" && $SLURM_PROCID -eq 0 ]]; then
        echo ">> Sleeping 5s for cluster startup"
        sleep 5
        echo ">> Running command $SM_EXECUTE"
        eval "$SM_EXECUTE; echo \$? > $workdir/done"
    elif [[ -n "$SM_EXECUTE" ]]; then
        while [[ ! -f "$workdir/done" ]]; do sleep 10s; done
    else
        _slurm_sleep
    fi
}

startup() {
    workdir=$1
    envscript=$2
    batch=$(_detect_allocation)

    if [ -z "$batch" ]; then
        echo "!!! Cannot detect batch system"
        usage
    fi

    test="$workdir/env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"
    test="$workdir/spark-env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"

    if [ -z "$workdir" ]; then
        usage
    fi

    [[ ! -d "$workdir" ]] && mkdir -p "$workdir"

    echo ">>> Running on ${batch}"
    _${batch}_start_cluster "$workdir" "$envscript"
}

shutdown() {
    workdir=$1
    envscript=$2
    batch=$(_detect_allocation)

    if [ -z "$batch" ]; then
        echo "!!! Cannot detect batch system"
        usage
    fi

    test="$workdir/env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"
    test="$workdir/spark-env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"

    if [ -z "$workdir" ]; then
        usage
    fi

    echo ">>> Running on ${batch}"
    _${batch}_stop_cluster "$workdir" "$envscript"
}

cmd=$1
shift

[ -z "$(type $cmd 2> /dev/null|grep function)" ] && usage
$cmd $*
