#!/usr/bin/env python
from __future__ import print_function

import uuid
import glob
import os, yaml
import sys
import time
import xbow
from xbow.metering import SpotMeter
from xbow.instances import get_by_name, ConnectedInstance
from xbow.filesystems import fs_id_from_name
from xbow import pools

def pack_and_run_remote(command):
    """
    Stage the contents of the current directory to the xbow cluster, then
    run the given command, creating the neccessary worker instance.
    """
    cfg_file = os.path.join(xbow.XBOW_CONFIGDIR, "settings.yml")

    with open(cfg_file, 'r') as ymlfile:
        cfg = yaml.load(ymlfile)

    instances = get_by_name(cfg['scheduler_name'])
    if len(instances) == 0:
        raise ValueError('Error - no such instance')
    elif len(instances) > 1:
        raise ValueError('Error - more than one instance has that name')
    instance = instances[0]
    ci = ConnectedInstance(instance)
    az = instance.placement['AvailabilityZone']
    meter = SpotMeter(cfg['worker_instance_type'], az, count = cfg['pool_size'])
    jobid = uuid.uuid4()
    mount_point=cfg['mount_point']

    ci.exec_command('sudo apt update && sudo apt install -y task-spooler && tsp -S 80')
    print('remote directory will be {}/{}'.format(mount_point, jobid))
    ci.exec_command('mkdir {}/{}'.format(mount_point, jobid))
    infiles = glob.glob('*')
    print('uploading files:')
    for filename in infiles:
        if os.path.isfile(filename):
            print('    {}'.format(filename))
            ci.upload(filename, '{}/{}/{}'.format(mount_point, jobid, filename))
    ci.exec_command("cd {}/{} && tsp xflow-exec '{}'".format(mount_point, jobid, command)) 
    tsp_id = ci.output[:-1]
    print('tsp job {} submitted.'.format(tsp_id))

    HOSTFILE = open(os.path.expanduser('.xbow_ids.yml'), 'w+')
    HOSTFILE.write('jobid: {}\n'.format(jobid))
    HOSTFILE.write('tsp_id: {}\n'.format(tsp_id))
    HOSTFILE.close()

    exit(0)
    #
    # Stuff below is disabled for now - we assume the cluster has been
    # created with a worker node ready to go. This is because the
    # script here can't pick up any provisioning that might have been
    # dome when the original cluster was launched. Since this typically
    # includes things like adding the MD codes (via pinda), an extra
    # worker launched without this stuff is not going to be much use.
    #
    cfg['scheduler_ip_address'] = instance.private_ip_address
    cfg['fs_id'] = fs_id_from_name(cfg['shared_file_system'])
    user_data = '''Content-Type: multipart/mixed; boundary="//"
MIME-Version: 1.0

--//
Content-Type: text/cloud-config; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment; filename="cloud-config.txt"

#cloud-config
cloud_final_modules:
- [scripts-user, always]

--//
Content-Type: text/x-shellscript; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment; filename="userdata.txt"

#!/bin/bash
pip install dask distributed && sudo -u ubuntu dask-worker --local-directory /tmp/dask --nthreads 1 --nprocs 1 --worker-port 45792 {scheduler_ip_address}:8786 &
mkdir -p {mount_point}
mount -t nfs -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 {fs_id}.efs.{region}.amazonaws.com:/ {mount_point}
chmod go+rw {mount_point}
echo 'SHARED={mount_point}' >> /etc/environment
'''.format(**cfg)

    print("Creating a worker, this may take some time...")

    sip = pools.create_spot_pool(cfg['worker_pool_name'],
                            count=1,
                            price=cfg['price'],
                            image_id=cfg['image_id'],
                            region=cfg['region'],
                            instance_type=cfg['worker_instance_type'],
                            ec2_security_groups=cfg['ec2_security_groups'],
                            user_data=user_data,
                            append=True, wait=False
                          )
    print("Instance running")

if __name__ == '__main__':
    command = ' '.join(sys.argv[1:])
    pack_and_run_remote(command)
