Metadata-Version: 2.1
Name: cphoenix
Version: 0.5.7rc8
Summary: Function to help Data Scientist work more effectively with DWH
Home-page: https://github.com/pypa/sampleproject
Author: dungvc
Author-email: duyvnc@fpt.com.vn
License: dungvc
Project-URL: Bug Tracker, https://github.com/pypa/sampleproject/issues
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.6
Description-Content-Type: text/markdown

-----------------

# spark-sdk: Function to help Data Scientist work more effectively with DataLake
[![PyPI Latest Release](https://img.shields.io/badge/pypi-0.4.21-blue)](https://pypi.org/project/spark-sdk/)
[![Package Status](https://img.shields.io/badge/status-stable-green)](https://pypi.org/project/spark-sdk/)
[![Downloads](https://static.pepy.tech/personalized-badge/spark-sdk?period=month&units=international_system&left_color=black&right_color=orange&left_text=PyPI%20downloads%20per%20month)](https://pepy.tech/project/spark-sdk)
[![Powered by NumFOCUS](https://img.shields.io/badge/powered%20by-CADS-orange.svg?style=flat&colorA=E1523D&colorB=007D8A)](https://blog.cads.live/)



## What is it?

**spark-sdk** Function to help Data Scientist work more effectively with DataLake. Include different function work with spark, pyarrow

## Main Features
Here are just a few of the things that spark-sdk does well:
  - Get your spark with newest version update,
    PySpark() function to get your spark requirement.
  - Easy to read and write data to data lake.
  - Support user using key to encrypt or decrypt data
  - Support function to work with distributed system (datalake)


## Install
Binary installers for the latest released version are available at the [Python
Package Index (PyPI)](https://pypi.org/project/spark-sdk).


```sh
# with PyPI
pip install spark-sdk
```

## Dependencies
- [pyspark - Apache Spark Python API](https://spark.apache.org/docs/latest/api/python/)
- [pyarrow - Python library for Apache Arrow](https://arrow.apache.org/docs/python/index.html)
- [pandas - Powerful data structures for data analysis, time series, and statistics](https://pandas.pydata.org/)


## Installation from sources
To install spark-sdk from source you need [Cython](http://git.bigdata.local/data-engineers/sdk/utilities/-/tree/master/spark_sdk) in addition to the normal
dependencies above. Cython can be installed from PyPI:

```sh
pip install cython
```

In the `spark-sdk` directory (same one where you found this file after
cloning the git repo), execute:

```sh
python setup.py install
```




## Get Spark
```python
import spark_sdk as ss
spark = ss.PySpark(yarn=False, num_executors=4, driver_memory='8G').spark

# Yarn mode
spark = ss.PySpark(yarn=True, driver_memory='2G', num_executors=4, executor_memory='4G').spark

# Add more spark config
spark = ss.PySpark(yarn=False, driver_memory='8G', num_executors=4, executor_memory='4G',
                  add_on_config1=("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog"),
                  add_on_config2=('spark.databricks.delta.retentionDurationCheck.enabled', 'false')
                  ).spark
```
## Store data to dwh
```python

# step 1 read data
ELT_DATE = '2021-12-01'
ELT_STR = ELT_DATE[:7]
import pandas as pd
df1 = pd.read_csv('./data.csv', sep='\t')


import spark_sdk as ss

# function to store to dwh
df1.to_dwh(    
    hdfs_path="path/to/data/test1.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test1", # table name
    repartition=True
)
```

## Function to read from dwh
```python
# method 1: using pandas
import spark_sdk as ss
import pandas as pd

pandas_df = pd.read_dwh('database.table_name', filters="""m='2022-04-01'""")

# method 2: sql
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")

df = sparkDF.toPandas()


# method 3: read_table
sparkDF = ss.read_table('database_name.table')
sparkDF = sparkDF.filter("m == '2022-04-01'")

df = sparkDF.toPandas()


# IF got error timestamp out of range
sparkDF = ss.limit_timestamp(sparkDF).toPandas()

# IF YOU WANT TO DELETE DATA
# Link hdfs_file to table
# When to drop it also delete data
ss.drop_table_and_delete_data('database_name.test1')

# IF JUST WANT TO DELETE TABLE NOT DELETE DATA
ss.drop_table('database_name.test1')
```

## Delta format
### function to store to dwh
```python
df1.to_dwh(
    # just end path with delta then table will be store in delta format
    hdfs_path="path/to/data/test1.delta/", 
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test1",
    repartition=True# table name
)

# read table not read file
sparkDF = ss.sql("""
SELECT *
FROM database_name.table1
WHERE m='2022-04-01'
""")

```


## Decrypt Data

### method 1: sql
Using Spark SQL
```python
import spark_sdk as ps

key = '1234' # contact Data Owner to get key

df = ps.sql(f"""
select fdecrypt(column_name, "{key}") column_name
from database_name.table_name
limit 50000
""").toPandas()
```


### method 2: using pandas
Using pandas
```python
import spark_sdk as ss
df['new_column'] = df['column'].decrypt_column(key)

## function will return dataframe with column_decrypted
```

## method 3
Using pandas apply function
```python
from spark_sdk import decrypt
df['decrypted'] = df['encrypted'].apply(decrypt,args=("YOUR_KEY",))
```

## Encrypt data
```python
import spark_sdk as ss


# function to store to dwh
df.to_dwh(    
    hdfs_path="path/to/data/test1.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test1", # table name
    encrypt_columns=['column1','column2'], # list column name need to encrypt
)

ss.PySpark().stop()
```

## Create Yaml File Mapping data from CSV to DWH
```python
from spark_sdk import CreateYamlDWH
create_yaml = CreateYamlDWH(
csv_file = 'data.csv',
hdfs_path = '/path/to/data/table_name.parquet',
sep = '\t',
database_name = 'database_name',
table_name = 'table_name',
yaml_path = '/path/to/yaml/file/'
)

create_yaml.generate_yaml_file()
```

## Store spark.sql.DataFrame
```python
import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")

ELT_DATE = '2022-06-10'
sparkDF.to_dwh(
    hdfs_path="path/to/data/test6.parquet/", # path hdfs
    partition_by="m", # column time want to partition
    partition_date=ELT_DATE, # partition date
    database="database_name", # database name
    table_name="test6", # table name
    repartition=True,
    driver_memory='4G', executor_memory='4g', num_executors='1', port='4090', yarn=True
)
```

## Read data
###
```python
# sql
import spark_sdk as ss
sparkDF = ss.sql("""select * from database.table_name limit 1000""")

# function
sparkDF = ss.read_parquet("hdfs:/path/to/file.parquet")
sparkDF = ss.read_csv("hdfs:/path/to/file.csv")
sparkDF = ss.read_json("hdfs:/path/to/file.json")

```

## Working with hdfs
```python
mkdir, cat, exists, info, open
```

### list file
```python
ss.ls('/path/data')
```

### create new path
```python
ss.mkdir('/path/create/')
```

### create new path
```python
ss.exists('/check/path/exists') # return True if exists
```

### print info
```python
ss.info('/path/info/')
```

### open file like local file
```python
import json
with ss.open('/path/to/file.json', mode='rb') as file:
    data = json.load(file)
```
```python
import json
from spark_sdk import Utf8Encoder
data = {'user': 1, 'code': '456'}
with ss.open('/path/to/file.json', mode='wb') as file:
    json.dump(data, Utf8Encoder(file), indent=4)
    
    
json__file = ss.read_json('/path/to/file.json')
ss.write_json(data, '/path/to_file.json')

sparkDF = ss.read_csv("file:///path/to/data.csv", sep='\t') # with local file
sparkDF = ss.read_csv("/path/to/data.csv", sep='\t') # with hdfs file

#### mode in ['rb', 'wb', 'ab']
```

