####
# FMA 
####

### Mapper

#!/usr/bin/env python
import sys
import csv

def trailing_zeroes(binary_str):
    return len(binary_str) - len(binary_str.rstrip('0'))

def main():
    reader = csv.reader(sys.stdin)
    next(reader, None)  # Skip header

    for row in reader:
        if len(row) > 7:
            user_name = row[7].strip()  # 'name' column
            if user_name:
                # Use Python's built-in hash (no external packages needed)
                hash_value = hash(user_name)
                binary_repr = bin(abs(hash_value))

                t_zeroes = trailing_zeroes(binary_repr)
                print("%d\t1" % t_zeroes)

if __name__ == "__main__":
    main()


## Reducer
#!/usr/bin/env python
import sys

def main():
    max_trailing_zeroes = 0

    for line in sys.stdin:
        try:
            trailing_zeroes, _ = line.strip().split('\t')
            trailing_zeroes = int(trailing_zeroes)
            if trailing_zeroes > max_trailing_zeroes:
                max_trailing_zeroes = trailing_zeroes
        except:
            continue

    estimated_distinct_users = 2 ** max_trailing_zeroes
    print("Estimated distinct users: %d" % estimated_distinct_users)

if __name__ == "__main__":
    main()

### COMMANDS

# make sure scripts are executable and unix formatted
chmod +x mapper.py reducer.py
dos2unix mapper.py reducer.py      # optional but useful if files came from Windows

# Test mapper -> sort -> reducer (simulate streaming behavior)
# This assumes users.csv exists in current directory and has a header.
cat users.csv | ./mapper.py | sort -k1,1n | ./reducer.py

# make input directory
hdfs dfs -mkdir -p /user/cloudera/fma_input

# Upload file (use -f to overwrite if present)
hdfs dfs -put -f users.csv /user/cloudera/fma_input/
# OR if already uploaded and you want to list:
hdfs dfs -ls /user/cloudera/fma_input

hdfs dfs -rm -r -f /user/cloudera/fma_output

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -files mapper.py,reducer.py \
  -mapper "python mapper.py" \
  -reducer "python reducer.py" \
  -input /user/cloudera/fma_input/users.csv \
  -output /user/cloudera/fma_output \
  -jobconf mapreduce.job.name="FMA_FlajoletMartin" \
  -D mapreduce.job.reduces=1 \
  -cmdenv PYTHONIOENCODING=utf8


hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -files mapper.py,reducer.py \
  -mapper "python2.6 mapper.py" \
  -reducer "python2.6 reducer.py" \
  -input /user/cloudera/fma_input/users.csv \
  -output /user/cloudera/fma_output \
  -jobconf mapreduce.job.name="FMA_FlajoletMartin_py26" \
  -D mapreduce.job.reduces=1 \
  -cmdenv PYTHONIOENCODING=utf8

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -files mapper.py \
  -mapper "python mapper.py" \
  -reducer /bin/cat \
  -input /user/cloudera/fma_input/users.csv \
  -output /user/cloudera/fma_output_maponly \
  -D mapreduce.job.reduces=0 \
  -jobconf mapreduce.job.name="FMA_maponly" \
  -cmdenv PYTHONIOENCODING=utf8


hdfs dfs -ls /user/cloudera/fma_output
hdfs dfs -cat /user/cloudera/fma_output/part-00000
hdfs dfs -getmerge /user/cloudera/fma_output /home/cloudera/fma_result.txt
less /home/cloudera/fma_result.txt

