######
## BLOOM FILTER
###

####
# Mapper.py
###
#!/usr/bin/env python
import sys
import re

# Bloom filter size - adjust based on expected unique IPs
M = 1000000  # 1 million bits

def is_valid_ip(ip):
    """Check if the string is a valid IP address"""
    ip_pattern = r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$'
    return re.match(ip_pattern, ip) is not None

def ip_to_number(ip):
    """Convert IP address to a numerical value for hashing"""
    parts = ip.split('.')
    if len(parts) != 4:
        return 0
    try:
        return (int(parts[0]) << 24) + (int(parts[1]) << 16) + (int(parts[2]) << 8) + int(parts[3])
    except:
        return 0

def main():
    # Initialize Bloom filter
    bloom_filter = [0] * M
    
    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
            
        try:
            # Extract IP address (first column)
            parts = line.split(',')
            if len(parts) < 1:
                continue
                
            ip_address = parts[0].strip()
            
            # Validate IP address format
            if not is_valid_ip(ip_address):
                sys.stderr.write("Invalid IP format: {0}\n".format(ip_address))
                continue
            
            # Convert IP to numerical value for hashing
            ip_num = ip_to_number(ip_address)
            if ip_num == 0:
                continue
            
            # Apply your Bloom filter hash functions
            h1 = ip_num % M
            h2 = (3 * ip_num + 2) % M
            
            # Check if IP is duplicate
            if bloom_filter[h1] == 0:
                bloom_filter[h1] = 1
                # IP is new, emit it
                print("{0}\t{1}".format(ip_address, line))
            elif bloom_filter[h1] == 1:
                if bloom_filter[h2] == 0:
                    bloom_filter[h2] = 1
                    # IP is new (second hash), emit it
                    print("{0}\t{1}".format(ip_address, line))
                else:
                    # Both hash positions are set - duplicate detected
                    sys.stderr.write("Duplicate detected: {0}\n".format(ip_address))
            else:
                sys.stderr.write("Duplicate detected: {0}\n".format(ip_address))
                
        except Exception as e:
            sys.stderr.write("Error processing line: {0} - {1}\n".format(line, str(e)))
            continue
    
    # Output final Bloom filter state for monitoring
    sys.stderr.write("Final Bloom's filter size: {0}\n".format(M))

if __name__ == "__main__":
    main()

####
#REDUCER.py
####

#!/usr/bin/env python
import sys

def main():
    unique_count = 0
    
    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
            
        try:
            # Split by tab to get IP and full record
            parts = line.split('\t', 1)
            if len(parts) == 2:
                ip, full_record = parts
                # Output the unique record
                print(full_record)
                unique_count += 1
            else:
                # Handle cases where tab separator might be missing
                print(line)
                unique_count += 1
                
        except Exception as e:
            sys.stderr.write("Error processing line: {0} - {1}\n".format(line, str(e)))
            continue
    
    # Output statistics
    sys.stderr.write("Total unique IP records: {0}\n".format(unique_count))

if __name__ == "__main__":
    main()


####
# COMMANDS
###

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -input /user/cloudera/input/bloom_data.csv \
    -output /user/cloudera/output/bloom_output \
    -mapper /home/cloudera/mapper.py \
    -reducer /home/cloudera/reducer.py \
    -file /home/cloudera/mapper.py \
    -file /home/cloudera/reducer.py \
    -cmdenv PYTHONIOENCODING=utf8 \
    -jobconf mapreduce.job.name="BloomFilter_IP_Deduplication" \
    -jobconf mapreduce.map.output.compress=true \
    -jobconf mapreduce.job.reduces=1 \
    -jobconf stream.non.zero.exit.is.failure=false


-mapper "python2.6 mapper.py" \
-reducer "python2.6 reducer.py"

hdfs dfs -mkdir -p /user/cloudera/input
hdfs dfs -put bloom_data.csv /user/cloudera/input/

chmod +x /home/cloudera/mapper.py
chmod +x /home/cloudera/reducer.py

hdfs dfs -rm -r /user/cloudera/output/bloom_output

hdfs dfs -ls /user/cloudera/output/bloom_output
hdfs dfs -cat /user/cloudera/output/bloom_output/part-00000

yarn logs -applicationId <your_app_id>


#######
ANOTHER BLOOM FILTER example
######

###
#Mapper
###

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import hashlib

# Bloom filter parameters
BIT_ARRAY_SIZE = 10000
HASH_COUNT = 5
bit_array = [0] * BIT_ARRAY_SIZE

def get_hashes(value):
    return [
        int(hashlib.md5((value + str(i)).encode()).hexdigest(), 16) % BIT_ARRAY_SIZE
        for i in range(HASH_COUNT)
    ]

for line in sys.stdin:
    if line.startswith("Date"):  # skip header
        continue

    fields = line.strip().split(",")
    if len(fields) < 5:  # expect: Date, FlightNum, Carrier, Origin, Dest
        continue

    date = fields[0]
    flight_number = fields[1]
    carrier_code = fields[2]
    origin = fields[3]
    dest = fields[4]

    # old Python (2.6) needs indexed placeholders
    key = "{0}_{1}_{2}_{3}_{4}".format(date, flight_number, carrier_code, origin, dest)

    # Bloom filter check
    hashes = get_hashes(key)
    if all(bit_array[h] == 1 for h in hashes):
        continue
    for h in hashes:
        bit_array[h] = 1

    print(key)

####
#reducer
###

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys

for line in sys.stdin:
    print(line.strip())

####
#COMMANDS
####

ls -l mapper.py reducer.py
file mapper.py reducer.py

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -files mapper.py,reducer.py \
  -mapper "python2.6 mapper.py" \
  -reducer "python2.6 reducer.py" \
  -input /user/cloudera/flights_input/flights.csv \
  -output /user/cloudera/flights_output \
  -jobconf mapreduce.job.name="BloomFilter_Flights" \
  -D mapreduce.job.reduces=1 \
  -cmdenv PYTHONIOENCODING=utf8

# 4) Map-only run (no reducer) 
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
  -files mapper.py \
  -mapper "python2.6 mapper.py" \
  -reducer /bin/cat \
  -input /user/cloudera/flights_input/flights.csv \
  -output /user/cloudera/flights_output_maponly \
  -D mapreduce.job.reduces=0 \
  -jobconf mapreduce.job.name="BloomFilter_Flights_maponly" \
  -cmdenv PYTHONIOENCODING=utf8
