#!/bin/bash
# Run the scalars sql job against all Glean pings in a product. Assumes that
# generate_fenix_sql has already been run.

set -e

# `date` is not consistent across MacOS and GNU/Linux
function yesterday {
    python3 - <<EOD
from datetime import date, timedelta
dt = date.today() - timedelta(1)
print(dt.strftime("%Y-%m-%d"))
EOD
}

# Project of the generated SQL
PROJECT=${PROJECT:-"glam-fenix-dev"}
# The DST project is the location of the destination tables, which may be
# different from PROJECT. One example would be for running desktop SQL into a
# dataset that's not moz-fx-data-shared-prod.
DST_PROJECT=${DST_PROJECT:-$PROJECT}
PROD_DATASET=${PROD_DATASET:-"glam_etl"}
DATASET=${DATASET:-"glam_etl_dev"}
SUBMISSION_DATE=${SUBMISSION_DATE:-$(yesterday)}


# Replace PROD_DATASET with DATASET in order to run the entire pipeline within a
# separate dataset. This is effectively a hack for what should be generated
# queries in the correct location. However, this provides a mechanism for
# changing the dataset location for testing.
function replace_dataset {
    local sql_path=$1
    sed "s/$PROD_DATASET/$DATASET/g" < "$sql_path"
}

function run_query {
    local destination_table=$1
    local time_partition=${2:-false}
    local min_sample_id=${3:-0}
    local max_sample_id=${4:-99}
    local query_location=${5:-$destination_table}
    local additional_arguments="${6:---replace}"
    local sample_size=${7:-10}
    local query="sql/$PROJECT/$PROD_DATASET/$query_location/query.sql"

    # add an option to write to a time-partitioned table
    if $time_partition; then
        destination_table="${destination_table}\$${SUBMISSION_DATE//-/}"
    fi
    echo "running $query"
    local tmp
    tmp=$(mktemp)
    replace_dataset "$query" > "$tmp"
    bq query \
        --max_rows=0 \
        --use_legacy_sql=false \
        $additional_arguments \
        --project_id="$DST_PROJECT" \
        --dataset_id="$DATASET" \
        --destination_table="$destination_table" \
        --parameter="submission_date:DATE:$SUBMISSION_DATE" \
        --parameter="min_sample_id:INT64:$min_sample_id" \
        --parameter="max_sample_id:INT64:$max_sample_id" \
        --parameter="sample_size:INT64:$sample_size" \
        "$(if $time_partition; then echo --time_partitioning_type="DAY"; fi)" \
        < "$tmp"
}


function run_init {
    local destination_table=$1
    local init="sql/$PROJECT/$PROD_DATASET/$destination_table/init.sql"
    # run if needed
    if ! bq show "${DATASET}.${destination_table}" &> /dev/null; then
        echo "running $init"
        local tmp
        tmp=$(mktemp)
        replace_dataset "$init" > "$tmp"
        bq query \
            --use_legacy_sql=false \
            --project_id="$DST_PROJECT" \
            < "$tmp"
    fi
}


function run_view {
    local view_name=$1
    local view="sql/$PROJECT/$PROD_DATASET/$view_name/view.sql"
    echo "running $view"
    local tmp
    tmp=$(mktemp)
    replace_dataset "$view" > "$tmp"
    bq query \
        --use_legacy_sql=false \
        --project_id="$DST_PROJECT" \
        --dataset_id="$DATASET" \
        < "$tmp"
}


function run_partitioned_query {
    local table_name=$1
    local num_partitions=${2:-5}
    local partitioned_result=${3:-true}
    local partitioning_field="$(if [ -n "$4" ]; then echo --time_partitioning_field=$4; fi)"
    local clustering_fields="$(if [ -n "$5" ]; then echo --clustering_fields=$5; fi)"
    local additional_arguments="--append_table ${partitioning_field} ${clustering_fields}"

    local NUM_SAMPLE_IDS=100
    local PARTITION_SIZE=$((NUM_SAMPLE_IDS / num_partitions))

    run_query $table_name $partitioned_result 0 $((PARTITION_SIZE - 1)) \
        $table_name "$additional_arguments"

    for partition in $(seq 1 $((num_partitions - 1))); do
        local min_param=$((partition * PARTITION_SIZE))
        local max_param=$((min_param + PARTITION_SIZE - 1))
        run_query $table_name $partitioned_result $min_param $max_param \
            $table_name "$additional_arguments"
    done
}


function run_desktop_sql {
    local start_stage=${START_STAGE:-0}

    if ((start_stage <= 0)); then
        run_query "latest_versions"
    fi

    # Run clients_daily_*_scalar_aggregates
    if ((start_stage <= 1)); then
        run_init "clients_daily_scalar_aggregates_v1"
        run_query "clients_daily_scalar_aggregates_v1" \
            true 0 0 "clients_daily_scalar_aggregates_v1" \
            "--replace --clustering_fields=app_version,channel --time_partitioning_field=submission_date"
        run_query "clients_daily_scalar_aggregates_v1" \
            true 0 0 "clients_daily_keyed_scalar_aggregates_v1" \
            "--append_table --clustering_fields=app_version,channel --time_partitioning_field=submission_date"
        run_query "clients_daily_scalar_aggregates_v1" \
            true 0 0 "clients_daily_keyed_boolean_aggregates_v1" \
            "--append_table --clustering_fields=app_version,channel --time_partitioning_field=submission_date"
    fi

    # Run clients_daily_*_histogram_aggregates
    if ((start_stage <= 2)); then
        run_init "clients_daily_histogram_aggregates_v1"
        run_query "clients_daily_histogram_aggregates_v1" \
            true 0 0 "clients_daily_histogram_aggregates_v1" \
            "--replace --clustering_fields=app_version,channel --time_partitioning_field=submission_date"
        run_query "clients_daily_histogram_aggregates_v1" \
            true 0 0 "clients_daily_keyed_histogram_aggregates_v1" \
            "--append_table --clustering_fields=app_version,channel --time_partitioning_field=submission_date"
    fi

    # Run the rest of the clients scalar pipeline
    if ((start_stage <= 3)); then
        run_init "clients_scalar_aggregates_v1"
        run_query "clients_scalar_aggregates_v1"
        run_query "clients_scalar_probe_counts_v1"
        run_query "scalar_percentiles_v1"
    fi

    # Run the rest of the clients histogram pipeline
    if ((start_stage <= 4)); then
        run_init "clients_histogram_aggregates_v1"
        run_query "clients_histogram_aggregates_new_v1"
        run_partitioned_query "clients_histogram_aggregates_v1" \
            true "submission_date" "sample_id,app_version,channel"
    fi

    run_partitioned_query "clients_histogram_bucket_counts_v1" 10 false
    run_query "clients_histogram_probe_counts_v1"

    run_query "histogram_percentiles_v1"
    run_query "glam_user_counts_v1"
}

function wait_pids {
    # Wait for the results from a list of pids, so the script can exit on the
    # first encountered error.
    # https://stackoverflow.com/a/356154
    # https://askubuntu.com/a/674347
    local pids=("$@")
    for pid in "${pids[@]}"; do
        wait "$pid"
    done
}

function run_glean_sql {
    local error="STAGE must be one of (daily, incremental, all)"
    local product=$1
    local stage=${STAGE?$error}
    local start_stage=${START_STAGE:-0}
    local backfill_only=${BACKFILL_ONLY:-false}
    local export_only=${EXPORT_ONLY:-false}

    # used to store any process ids
    declare -a pids=()

    if [[ $backfill_only = true && $export_only = true ]]; then
        echo "BACKFILL_ONLY and EXPORT_ONLY are mutually exclusive"
        exit 1
    fi

    if ! [[ $stage == "daily" || $stage == "incremental" || $stage == "all" ]]; then
        echo "$error"
        exit 1
    fi
    if [[ $stage == "daily" || $stage == "all" ]]; then
        if ((start_stage <= 0)) && [[ $export_only = false ]]; then
            pids=()
            for directory in sql/$PROJECT/glam_etl/"${product}"__clients_daily_scalar_aggregates*/; do
                run_query "$(basename "$directory")" true &
                pids+=($!)
            done
            for directory in sql/$PROJECT/glam_etl/"${product}"__clients_daily_histogram_aggregates*/; do
                run_query "$(basename "$directory")" true &
                pids+=($!)
            done
            wait_pids "${pids[@]}"

            pids=()
            # run in daily and incremental
            run_view "${product}__view_clients_daily_scalar_aggregates_v1" &
            pids+=($!)
            run_view "${product}__view_clients_daily_histogram_aggregates_v1" &
            pids+=($!)
            wait_pids "${pids[@]}"
        fi
    fi
    if [[ $stage == "incremental" || $stage == "all" ]]; then
        if ((start_stage <= 1)) && [[ $export_only = false ]]; then
            run_view "${product}__view_clients_daily_histogram_aggregates_v1"
            # latest versions now depends on scalar aggregates
            run_view "${product}__view_clients_daily_scalar_aggregates_v1"
            run_query "${product}__latest_versions_v1"

            pids=()
            run_init "${product}__clients_scalar_aggregates_v1" &
            pids+=($!)
            run_init "${product}__clients_histogram_aggregates_v1" &
            pids+=($!)
            wait_pids "${pids[@]}"

            pids=()
            run_query "${product}__clients_scalar_aggregates_v1" &
            pids+=($!)
            run_query "${product}__clients_histogram_aggregates_v1" &
            pids+=($!)
            wait_pids "${pids[@]}"
        fi
        if [[ $backfill_only = true ]]; then
            return
        fi
        if ((start_stage <= 2)); then
            pids=()
            run_query "${product}__scalar_bucket_counts_v1" &
            pids+=($!)
            run_query "${product}__histogram_bucket_counts_v1" &
            pids+=($!)
            wait_pids "${pids[@]}"

            pids=()
            run_query "${product}__scalar_probe_counts_v1" &
            pids+=($!)
            run_query "${product}__histogram_probe_counts_v1" &
            pids+=($!)
            wait_pids "${pids[@]}"

            pids=()
            run_query "${product}__scalar_percentiles_v1" &
            pids+=($!)
            run_query "${product}__histogram_percentiles_v1" &
            pids+=($!)
            wait_pids "${pids[@]}"
        fi
        pids=()
        run_view "${product}__view_probe_counts_v1" &
        pids+=($!)
        run_view "${product}__view_user_counts_v1" &
        pids+=($!)
        wait_pids "${pids[@]}"

        pids=()
        run_query "${product}__extract_user_counts_v1" &
        pids+=($!)
        run_query "${product}__extract_probe_counts_v1" &
        pids+=($!)
        wait_pids "${pids[@]}"
    fi
}


function main {
    cd "$(dirname "$0")/../.."

    local reset=${RESET:-false}
    local export_only=${EXPORT_ONLY:-false}
    local product=${PRODUCT?PRODUCT must be defined}

    if [[ $reset = true && $export_only = true ]]; then
        echo "RESET and EXPORT_ONLY are mutually exclusive"
        exit 1
    fi

    # revert to the original default project in the environment
    original_project=$(gcloud config get-value project)
    function cleanup {
        gcloud config set project "$original_project"
    }
    trap cleanup EXIT

    gcloud config set project $PROJECT

    # force delete the dataset
    if $reset; then
        bq rm -r -f "$DATASET"
    fi
    if ! bq ls "${PROJECT}:${DATASET}" &> /dev/null; then
        bq mk "$DATASET"
    fi


    if [[ "$product" = "desktop" ]]; then
        run_desktop_sql
    else
        run_glean_sql "$product"
    fi
}

if [[ -z "$IMPORT" ]]; then
    main
fi
