Metadata-Version: 2.1
Name: rscylladb
Version: 0.0.4
Summary: Bulk records add into Cassandra or ScyllaDB.
Author: Reeya Patel and Meet Rathod
Author-email: 
Description-Content-Type: text/markdown


<p align="center">

  <a href="https://skillicons.dev">

    <img src="https://skillicons.dev/icons?i=git,cassandra,python" />

  </a>

</p>



## Bulk Installer ScyllaDB/Cassandra ![my badge](https://badgen.net/badge/version/0.0.1/red?icon=git)



Insert over 20Million records within minutes.



**Special thanks 🤝 to @[Reeya Patel](https://github.com/ReeyaPatel06)**



### Install using

```shell

pip install rscylla

```

### Usage

```python

from rscylla.cql import Cql

# hostslist = ['127.0.0.1']

# port = 9042

# table_name = keyspace.table_name [must required]

# username = cassandra

# password  = cassandra

obj = Cql(hostslist,port,table_name,username,password)

# default chunks = 10,000

# default workers = 4

obj.insert(filename,chuncks,workers) # filename or filepath

```

#### Example:

```python

from rscylla.cql import Cql

obj = Cql(["localhost"],9042,"cassandra","cassandra")

obj.insert("data.csv") # optional : chuncks and workers

# way 2

obj.insert("data.csv",chunks=1000,workers=2)

```

### Result

- Tested on 4GB RAM with i3 processor.



| Rows | Duration |

|:-----|:---------|

|10,000|0.03s|

|3,00,000|2m|

|2,00,00,000|66m|

**Note: Depends on you hardware capability!**

## Work flow

1. Import packages

``` python

from cassandra.cluster import Cluster

from cassandra.auth import PlainTextAuthProvider # authenticate user

import pandas as pd

import numpy as np

import threading

import time

```



2. **class Cql** constructor accept:

    - list of hosts : list of string

    - port : integer

    - username : string

    - password : string

```python

class Cql:

    def __init__(self,hosts:list,port:int,table_name:str,user:str,password:str) -> None:

```



3. In the constructor *_ _ init _ _* intialize cluster and authenticator for connect ScyllaDb server.

```python

self.start_time = time.time() # for completion the time show

# if table name is not formated as keyspace.table_name then raise exception

if len(table_name.split("."))!=2: raise Exception("Table name must be! keyspace.table_name")

# table_name.split('.') returns [keyspace,table_name]

# keyspace

self.keyspace_name = table_name.split(".")[0]

# table nme

self.table_name = table_name.split(".")[1]

# set authenticator with username & password

auth = PlainTextAuthProvider(username=user,password=password)

# set cluster, hosts,port and auth

self.cluster = Cluster(contact_points=hosts,port=port,auth_provider=auth)

# set keyspace

self.session = self.cluster.connect(self.keyspace_name)

```

4. *insert* method accept:

    - For this method chunks and workers are optional

```python

# chunks default = 10000

# workers default = 4

def insert(self,file_name:str,chuncks=10000,workers=4)->None:

```



5. Get inputed table's all columns and data types to avoid data type conflicts.

    - get table's columns & data types from *system_schema* keyspace. 

```python

table_schema = self.session.execute("SELECT column_name,type FROM system_schema.columns WHERE keyspace_name='%s' AND table_name='%s'"%(self.keyspace_name,self.table_name))     

```



6. Dictinary which contains cql data types & with python data types.

    - cql data type as key & python data type as value

```python

self.cql_to_python_type = { 

            'NULL': None,

            'boolean': bool,

            'float': float,

            'double': float,

            'int':int,

            'smallint':int, 

            'tinyint':int,

            'counter':int,

            'varint':int,

            'bigint': int,

            'decimal': float,

            'ascii':str,

            'varchar':str,

            'text': str,

            'blob': str,

            'date': str,

            'timestamp': str,

            'time': str,

            'list': list,

            'set': set,

            'map': dict,

            'timeuuid,': None,

            'uuid': None 

        }

```



7. Create **self.cql_types** dictionary for **pandans Data Frame**. Store all columns names from table_schema (exectracted from _schema_keyspace_ [inputed table's cols])



```python

self.cql_types = {}

self.cql_columns=[]

# create dictonary for data frame: column_name:python_datatype

for row in table_schema: # iterate extracted column_names & data types

    # row contains column_name & type

    # create key as column_name & value as python data type get from cql_to_python_type dictionary

    self.cql_types[row.column_name] = self.cql_to_python_type[row.type]

    # add column name into list

    self.cql_columns.append(row.column_name)

```

Example:

```

    # if student table contnains two rows:

    1. id : bigint

    2. name : text

    

    # which converted as

    cql_types = {

        "id":int,

        "name":str

    }

```

8. Create commana seprated string for insert statement

``` python

param_keys = ",".join(self.cql_columns) 

param_values = ",".join([ '%('+k+')s' for k in self.cql_columns])

self.insert__statement = "INSERT INTO "+self.table_name+" ("+param_keys+") VALUES ("+param_values+")"

```

Example:

```

if cql_columns = ['id','name']

then

param_keys = 'id,name'

# for bind value as dictionary (json object)

param_values = '%(id)s,%(name)s'

# param_values direct bind py execute method.

```



9. Read file based on their extension

    - pandas read_csv method accept data frame & chunksize which slice huge file into chunks.

    - supported files are: .csv, .json, .xls, .xlsx

``` python

# supported files

file_ext = file_name.split(".")[-1] # get extension

if file_ext=="csv": # if csv then read csv with chuncks

    data_frames = pd.read_csv(file_name,chunksize=chuncks)

elif file_ext=="json":

    data_frames = pd.read_json(file_name,chunksize=chuncks,lines=True)

elif file_ext in ["xls","xlsx"]:

    data_frames = pd.read_excel(file_name,chunksize=chuncks)

else:

    raise Exception("%s is not supported!"%(file_name))



```

Example:

```

data__frames has Pandas TextFileReader object 

```

10. Create empty thread list

``` python

threads = list(range(workers))

# e.g workers = 4

# threads = [0,1,2,3]

```

11. Start while loop for read data_frames:

 - df = next(data_frames) fetch data frame from TextFileReader, Every loop fetch new frame

 - First for loop assigin thread object into threads[i] position & start thread

 - In the thread class we pass **execute_insert** method and parameters as args = (dataframe,)

 - Second for loop wait for threads[i] complete the execution.

```python

while True:

    try:

        # start thread

        for i in range(workers):

            # fetch next dataframe

            df = next(data_frames)

            threads[i] = threading.Thread(target=self.execute_insert,args=(df,))

            threads[i].start()

        

        # wait until complete task

        for i in range(workers):

            threads[i].join()

    # on execption break

    except Exception as e:

        print(e)

        break

```



12. Created method **execute_insert**.

 - Accept parameter DataFrame

``` python

def execute_insert(self,df:pd.DataFrame)->None:

```



13. In the method:

 -  Make lower case column names. (cql have all lower case columns names)

 - handle null type for numeric values:

   - **df.select_dtypes(include=np.number).columns** get column name which have data type float,int

   - **df.select_dtypes(include=np.number).fillna(0)** select columns & fill 0 if null present

   - **df.select_dtypes(exclude=np.number).fillna('')** select non numeric cols and fill empty string.

 - Iterate df (dataframe) with **session.execute_async** method with insert statement & row(dict|json object).

   - Method **session.execute_async** return future object so result() is required for completed execute.

   - Add **session.execute_async** into **futures list**

 - Second loop iterate each future object & call result() method. By using this loop wait for all **session.execute_async** method to complete execution.



**NOTE: In dataframe Pandas.NaN set by default where null value is located and cql raise exception so we have to fill 0 & empty string**

 ``` python

# make lower columns name for cql

df.columns = map(str.lower,df.columns)

# handle null for numeric type default 0 add

df[df.select_dtypes(include=np.number).columns] = df.select_dtypes(include=np.number).fillna(0) 

# handle null for object and string types

df[df.select_dtypes(exclude=np.number).columns] = df.select_dtypes(exclude=np.number).fillna('') 

# empty list

futures = []

# excute async & store future object

for row in df.to_dict(orient='records'): # iterate each records

    futures.append(self.session.execute_async(self.insert__statement,row))

# wait for complete async 

for future in futures:

    future.result() # wait until insert queries

```



# Authors :

- ### 🙋‍♀️ [Reeya Patel](https://github.com/ReeyaPatel06)

- ### 🙋‍♂️ [Meet Rathod](https://github.com/MeetRathod0)





