#!/bin/bash
# vim: sw=4 et

if [ -n "$SM_VERBOSE" ]; then
    set -x
fi

usage() {
    cat 1>&2 <<EOF
usage: sm_run [-c SM_WORKER_CORES] [-m SM_WORKER_MEMORY] [-H|-h HADOOP_HOME] [-w WORKDIR] [-e ENVIRONMENT] COMMAND ARGS…
       sm_startup WORKDIR [ENVIRONMENT]
       sm_shutdown

special options:
  -H   Disabe HADOOP support.

positional arguments:
  WORKDIR       Working directory to use. Defaults to ./_cluster.
  ENVIRONMENT   Script to source to obtain the right environment.
                Will be automatically looked for in \$WORKDIR, \$SPARK_CONF_DIR, \$SPARK_HOME.

environment variables to configure behavior:
  HADOOP_HOME           If set, will spawn a HADOOP cluster.

  SM_MASTER_MEMORY      Memory to dedicate to the master. Will be
                        subtracted from the detected machine memory when
                        calculating the memory allocation for workers.

                        Can be set by the user, and is specified in MB.
                        Defaults to 4096.

  SM_WORKDIR            The WORKDIR exported and accessible to the
                        ENVIRONMENT script.

  SM_WORKER_CORES       Cores to allot to a worker.

  SM_WORKER_MEMORY      Memory to allot to a worker.

  SM_WORKER_COUNT       Limit number of workers.

  SM_HADOOP_COUNT       Limit number of Hadoop name nodes.

  SM_EXECUTE            Command to execute once the cluster has started.

  SM_VERBOSE            Print all commands before executing them, via `set -x`.

allocate resources with SLURM via:

    salloc -Aproj16 -pinteractive --exclusive --time 10:00:00 -N4

or using COBALT with:

    qsub -A PlasticNeocortex -t 60 -n 8 -I
EOF
    exit 1
}

_detect_allocation() {
    if [ ! -z "$SLURM_JOBID" ]; then
        echo slurm
    elif [ ! -z "$COBALT_NODEFILE" ]; then
        echo cobalt
    fi
}

_detect_memory() {
    awk '/MemAvailable/ {print int($2/1024)}' /proc/meminfo
}

_cobalt_start_master() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -a "$envscript" ]] && . $envscript

    export SM_MASTER_MEMORY=${SM_MASTER_MEMORY:-4096}
    export SPARK_DAEMON_MEMORY=${SM_MASTER_MEMORY}m
    export SPARK_MASTER_IP=$(hostname)

    echo "spark://$SPARK_MASTER_IP:${SPARK_MASTER_PORT:-7077}" > "$workdir/spark_master"

    $SPARK_HOME/sbin/start-master.sh
}

_cobalt_start_worker() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -f "$envscript" ]] && . $envscript

    MASTER_NODE=$(cat $workdir/spark_master)

    echo "> Connecting to ${MASTER_NODE}"

    $SPARK_HOME/sbin/start-slave.sh $MASTER_NODE || exit $?
}

_cobalt_start_cluster() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    script=$(readlink -f $0)
    workdir=$(readlink -f $workdir)
    envscript=$(readlink -f $envscript)

    master=$(grep -oe '\(\w*\.\)\{1,\}\w*'<$workdir/spark_master)

    if [ -z "$(ssh $master jps -lm 2>/dev/null|grep org.apache.spark.deploy.master.Master)" ]; then
        master=
    fi

    if [ ! -d "$workdir" ]; then
        echo ">> Creating working directory '$workdir'"
        mkdir $workdir
    fi

    if [ -z "$master" ]; then
        echo ">> Copying myself to working directory '$workdir'"
        cp $script $workdir

        master=$(head -n 1 "$COBALT_NODEFILE")
        ssh $master "$workdir/$(basename $script)" _cobalt_start_master "$workdir" "$envscript"
    else
        echo "<< Connecting to master $master"
    fi

    while read host; do
        ssh $host "$workdir/$(basename $script)" _cobalt_start_master "$workdir "$envscript""
    done < "$COBALT_NODEFILE"
}

_cobalt_stop_master() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -f "$envscript" ]] && . $envscript

    result=$($SPARK_HOME/sbin/stop-master.sh)
    echo $result
    if [[ $result == *"to stop"* ]]; then
        proc=$(ps -A | grep java)
        [[ ! -z "$proc" ]] && pkill -9 java
    fi
}

_cobalt_stop_worker() {
    envscript=$1
    workdir=$2
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -f "$envscript" ]] && . $envscript

    result=$($SPARK_HOME/sbin/stop-slave.sh)
    echo $result
    if [[ $result == *"to stop"* ]]; then
        proc=$(ps -A | grep java)
        [[ ! -z "$proc" ]] && pkill -9 java
    fi
}

_cobalt_stop_cluster() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    script=$(readlink -f $0)
    workdir=$(readlink -f $workdir)
    envscript=$(readlink -f $envscript)

    master=$(head -n 1 "$COBALT_NODEFILE")
    ssh $master "$script" _cobalt_stop_master "$workdir" "$envscript"
    while read host; do
        ssh $host "$script" _cobalt_stop_worker "$workdir" "$envscript"
    done < "$COBALT_NODEFILE"
}

_slurm_sleep() {
    time=$(
        scontrol show job $SLURM_JOBID| \
            ruby -rdate -ne "
                if (m = \$_.match(%r{EndTime=(.*)Deadline}))
                    diff = DateTime.parse(m[1] + ' ' + DateTime.now.zone) - DateTime.now()
                    puts (diff * (24*60*60)).to_i
                end"
    )
    echo "> Sleeping $time seconds"
    sleep $time
}

_slurm_start_cluster() {
    workdir=$1
    envscript=$2
    [[ -z "$workdir" ]] && usage

    myself=$(readlink -f $0)
    script=sm_cluster
    workdir=$(readlink -f $workdir)
    envscript=$(readlink -f $envscript)

    nvme=$(srun ls /|grep nvme)

    if [[ -z "$SPARK_LOCAL_DIRS" && -n "$nvme" ]]; then
        export SPARK_LOCAL_DIRS="/nvme/$(whoami)/$SLURM_JOBID/spark-local"
        export SPARK_WORKER_DIR="/nvme/$(whoami)/$SLURM_JOBID/spark-worker"
        echo ">> Creating SPARK_LOCAL_DIRS='/nvme/$(whoami)/$SLURM_JOBID'"
        srun mkdir -p "$SPARK_LOCAL_DIRS" "$SPARK_WORKER_DIR"
        srun sh -c 'echo "$(hostname): $(df -h|grep nvme)"'
    else
        export SPARK_WORKER_DIR="$workdir/work"
    fi

    master=none
    [[ -f "$workdir/spark_master" ]] && master=$(grep -oe '\(\w*\.\)\{1,\}\w*'<$workdir/spark_master)

    if [ -z "$(ssh $master jps -lm 2>/dev/null|grep org.apache.spark.deploy.master.Master)" ]; then
        master=none
        rm -f "$workdir/spark_master"
    fi

    if [ ! -d "$workdir" ]; then
        echo ">> Creating working directory '$workdir'"
        mkdir $workdir
    fi

    if [ "$master" = "none" ]; then
        echo ">> Copying myself to working directory '$workdir'"
        cp $myself $workdir/$script
    else
        echo "<< Connecting to master $master"
    fi

    if [ -n "$SM_EXECUTE" ]; then
        rm -f "$workdir/done"
    fi

    srun $workdir/$script _slurm_start_processes "$workdir" "$master" "$envscript"
    if [ -n "$SM_EXECUTE" ]; then
        while [[ ! -f "$workdir/done" ]]; do sleep 10s; done
        code=$(cat "$workdir/done")
        echo ">> Exiting with $code"
        exit $code
    fi
}

_slurm_stop_cluster() {
    workdir=$1
    [[ -z "$workdir" ]] && usage
    touch "$workdir/done"
}

_hdfs_start_processes() {
    workdir=$1

    [[ -z "$workdir" ]] && usage
    [[ -z "$HADOOP_HOME" ]] && return

    export HDFS_MASTER_NODE=$(scontrol show hostname $SLURM_NODELIST|head -n 1)
    export HDFS_DATA_DIR=/nvme/$(whoami)/$SLURM_JOBID/hadoop/data/$SLURM_JOBID.$SLURM_PROCID
    export HDFS_TMP_DIR=/nvme/$(whoami)/$SLURM_JOBID/hadoop/tmp/$SLURM_JOBID.$SLURM_PROCID
    export HDFS_NAME_DIR=$workdir/hadoop/name/

    export HADOOP_CONF_DIR=$workdir/hadoop/conf/$SLURM_JOBID.$SLURM_PROCID
    export HADOOP_LOG_DIR=$workdir/hadoop/logs

    mkdir -p "$HDFS_DATA_DIR" "$HDFS_TMP_DIR" "$HDFS_NAME_DIR" "$HADOOP_CONF_DIR" "$HADOOP_LOG_DIR"

    cat > "$HADOOP_CONF_DIR/core-site.xml" <<EOF
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>$HDFS_TMP_DIR</value>
  <description>A base for other temporary directories.</description>
</property>
<property>
  <name>fs.defaultFS</name>
  <value>hdfs://$HDFS_MASTER_NODE:54310</value>
</property>
</configuration>
EOF

    cat > "$HADOOP_CONF_DIR/hdfs-site.xml" <<EOF
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file://$HDFS_NAME_DIR</value>
    <description>Determines where on the local filesystem the DFS name node should store the name table.  If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy. </description>
    <final>true</final>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>$HDFS_DATA_DIR</value>
    <description>Determines where on the local filesystem an DFS data node should store its blocks.  If this is a comma-delimited list of directories, then data will be stored in all named directories, typically on different devices.  Directories that do not exist are ignored.  </description>
    <final>true</final>
  </property>
  <property>
   <name>dfs.namenode.secondary.http-address</name>
   <value>$HDFS_MASTER_NODE:50090</value>
   <description>The secondary namenode http server address and port.</description>
   <final>true</final>
  </property>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
    <final>true</final>
  </property>
</configuration>
EOF

    if [[ $SLURM_PROCID -eq 0 ]]; then
        $HADOOP_HOME/bin/hdfs namenode -format -force -nonInteractive
        $HADOOP_HOME/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode &
        sleep 15s
        touch $workdir/hadoop/namenode
    fi
    while [[ ! -f "$workdir/hadoop/namenode" ]]; do sleep 0.5s; done
    $HADOOP_HOME/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode &
}

_slurm_start_processes() {
    workdir=$1
    master=$2
    envscript=$3
    [[ -z "$workdir" ]] && usage

    export SM_WORKDIR="$workdir"

    [[ -f "$envscript" ]] && . $envscript

    _hdfs_start_processes "$workdir"

    export SPARK_LOG_DIR="$workdir/logs"

    export SM_MASTER_MEMORY=${SM_MASTER_MEMORY:-4096}

    if [[ "$master" = "none" && $SLURM_PROCID -eq 0 ]]; then
        export SPARK_DAEMON_MEMORY=${SM_MASTER_MEMORY}m
        export SPARK_MASTER_IP=$(hostname)

        output=$($SPARK_HOME/sbin/start-master.sh)
        echo "$output"
        echo "spark://$SPARK_MASTER_IP:${SPARK_MASTER_PORT:-7077}" > "$workdir/spark_master"
        [[ "$output" == *failed* ]] && exit 1
    fi

    if [[ "$master" = "none" ]]; then
        master=
    fi

    if [[ -n "$SM_WORKER_COUNT" && $SLURM_PROCID -ge $SM_WORKER_COUNT ]]; then
        exit 0
    fi

    while [[ ! -f "$workdir/spark_master" ]]; do sleep 0.5s; done

    mem=${SM_WORKER_MEMORY:-$((${SLURM_MEM_PER_NODE:-$(($SLURM_CPUS_ON_NODE * $SLURM_MEM_PER_CPU))} - $SM_MASTER_MEMORY))}
    detected=$(_detect_memory)

    if [[ $(( $mem > $detected )) ]]; then
        echo "> fixing memory to ${detected} MiB"
        mem=$detected
    fi

    export SPARK_WORKER_CORES=${SM_WORKER_CORES:-${SLURM_CPUS_PER_TASK:-$SLURM_CPUS_ON_NODE}}
    export SPARK_WORKER_MEMORY=${mem}m

    MASTER_NODE=${master:-spark://$(scontrol show hostname $SLURM_NODELIST | head -n 1):7077}

    echo "> Running workers with ${SPARK_WORKER_CORES} cores and ${SPARK_WORKER_MEMORY} memory"
    echo "> Connecting to ${MASTER_NODE}"

    output=$($SPARK_HOME/sbin/start-slave.sh $MASTER_NODE)
    echo "$output"
    [[ "$output" == *failed* ]] && exit 1

    if [[ -n "$SM_EXECUTE" && $SLURM_PROCID -eq 0 ]]; then
        echo ">> Sleeping 5s for cluster startup"
        sleep 5
        echo ">> Running command $SM_EXECUTE"
        eval "$SM_EXECUTE; echo \$? > $workdir/done"
    else
        while [[ ! -f "$workdir/done" ]]; do sleep 10s; done
    fi
}

startup() {
    workdir=$1
    envscript=$2
    batch=$(_detect_allocation)

    [[ -z "$workdir" ]] && workdir=./_cluster

    if [ -z "$batch" ]; then
        echo "cannot detect batch system"
        exit 2
    fi

    test="$workdir/env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"
    test="$workdir/spark-env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"
    test="$SPARK_CONF_DIR/conf/spark-env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"
    test="$SPARK_HOME/conf/spark-env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"

    if [ -z "$workdir" ]; then
        usage
    fi

    [[ ! -d "$workdir" ]] && mkdir -p "$workdir"

    echo ">>> Running on ${batch}"
    echo ">>> Working directory:  ${workdir}"
    echo ">>> Environment script: ${envscript}"
    echo ">>> Dedicated environment variables:"
    env|egrep '^SM_'
    _${batch}_start_cluster "$workdir" "$envscript"
    exit $?
}

shutdown() {
    workdir=$1
    envscript=$2
    batch=$(_detect_allocation)

    if [ -z "$batch" ]; then
        echo "!!! Cannot detect batch system"
        usage
    fi

    test="$workdir/env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"
    test="$workdir/spark-env.sh"
    [[ -z "$envscript" && -f "$test" ]] && envscript="$test"

    if [ -z "$workdir" ]; then
        usage
    fi

    echo ">>> Running on ${batch}"
    _${batch}_stop_cluster "$workdir" "$envscript"
}

run() {
    workdir=
    envscript=
    while getopts "c:e:Hh:m:w:" opt; do
        case $opt in
            c)
                export SM_WORKER_CORES=$OPTARG
                ;;
            m)
                export SM_WORKER_MEMORY=$OPTARG
                ;;
            h)
                export HADOOP_HOME=$OPTARG
                ;;
            H)
                export HADOOP_HOME=""
                ;;
            w)
                workdir=$OPTARG
                ;;
            e)
                envscript=$OPTARG
                ;;
            \?)
                echo "invalid option: -$OPTARG" >&2
                usage
                ;;
            :)
                echo "option -$OPTARG requires an argument" >&2
                usage
                ;;
        esac
    done
    shift $(( $OPTIND - 1 ))
    [ $# -ge 1 ] || usage
    export SM_EXECUTE="$*"

    startup "$workdir" "$envscript"
    exit $?
}

if [ $(basename $0) == "sm_cluster" ]; then
    cmd=$1
    shift
else
    base=$(basename $0)
    cmd=${base##sm_}
fi

[ -z "$(type $cmd 2> /dev/null|grep function)" ] && usage
$cmd $*
