Metadata-Version: 2.1
Name: rscylladb
Version: 1.0.5
Summary: Bulk records add into Cassandra or ScyllaDB.
Home-page: UNKNOWN
Author: Reeya Patel and Meet Rathod
Author-email: 
License: UNKNOWN
Description: 
        <p align="center">
          <a href="https://skillicons.dev">
            <img src="https://skillicons.dev/icons?i=git,cassandra,python" />
          </a>
        </p>
        
        ## Bulk Installer ScyllaDB/Cassandra ![my badge](https://badgen.net/badge/version/0.0.1/red?icon=git)
        
        Insert over 20Million records within minutes.
        
        **Special thanks 🤝 to @[Reeya Patel](https://github.com/ReeyaPatel06)**
        
        ### Install using
        ```shell
        pip install rscylla
        ```
        ### Usage
        ```python
        from rscylla.cql import Cql
        # hostslist = ['127.0.0.1']
        # port = 9042
        # table_name = keyspace.table_name [must required]
        # username = cassandra
        # password  = cassandra
        obj = Cql(hostslist,port,table_name,username,password)
        # default chunks = 10,000
        # default workers = 4
        obj.insert(filename,chuncks,workers) # filename or filepath
        ```
        #### Example:
        ```python
        from rscylla.cql import Cql
        obj = Cql(["localhost"],9042,"cassandra","cassandra")
        obj.insert("data.csv") # optional : chuncks and workers
        # way 2
        obj.insert("data.csv",chunks=1000,workers=2)
        ```
        ### Result
        - Tested on 4GB RAM with i3 processor.
        
        | Rows | Duration |
        |:-----|:---------|
        |10,000|0.03s|
        |3,00,000|2m|
        |2,00,00,000|66m|
        **Note: Depends on you hardware capability!**
        ## Work flow
        1. Import packages
        ``` python
        from cassandra.cluster import Cluster
        from cassandra.auth import PlainTextAuthProvider # authenticate user
        import pandas as pd
        import numpy as np
        import threading
        import time
        ```
        
        2. **class Cql** constructor accept:
            - list of hosts : list of string
            - port : integer
            - username : string
            - password : string
        ```python
        class Cql:
            def __init__(self,hosts:list,port:int,table_name:str,user:str,password:str) -> None:
        ```
        
        3. In the constructor *_ _ init _ _* intialize cluster and authenticator for connect ScyllaDb server.
        ```python
        self.start_time = time.time() # for completion the time show
        # if table name is not formated as keyspace.table_name then raise exception
        if len(table_name.split("."))!=2: raise Exception("Table name must be! keyspace.table_name")
        # table_name.split('.') returns [keyspace,table_name]
        # keyspace
        self.keyspace_name = table_name.split(".")[0]
        # table nme
        self.table_name = table_name.split(".")[1]
        # set authenticator with username & password
        auth = PlainTextAuthProvider(username=user,password=password)
        # set cluster, hosts,port and auth
        self.cluster = Cluster(contact_points=hosts,port=port,auth_provider=auth)
        # set keyspace
        self.session = self.cluster.connect(self.keyspace_name)
        ```
        4. *insert* method accept:
            - For this method chunks and workers are optional
        ```python
        # chunks default = 10000
        # workers default = 4
        def insert(self,file_name:str,chuncks=10000,workers=4)->None:
        ```
        
        5. Get inputed table's all columns and data types to avoid data type conflicts.
            - get table's columns & data types from *system_schema* keyspace. 
        ```python
        table_schema = self.session.execute("SELECT column_name,type FROM system_schema.columns WHERE keyspace_name='%s' AND table_name='%s'"%(self.keyspace_name,self.table_name))     
        ```
        
        6. Dictinary which contains cql data types & with python data types.
            - cql data type as key & python data type as value
        ```python
        self.cql_to_python_type = { 
                    'NULL': None,
                    'boolean': bool,
                    'float': float,
                    'double': float,
                    'int':int,
                    'smallint':int, 
                    'tinyint':int,
                    'counter':int,
                    'varint':int,
                    'bigint': int,
                    'decimal': float,
                    'ascii':str,
                    'varchar':str,
                    'text': str,
                    'blob': str,
                    'date': str,
                    'timestamp': str,
                    'time': str,
                    'list': list,
                    'set': set,
                    'map': dict,
                    'timeuuid,': None,
                    'uuid': None 
                }
        ```
        
        7. Create **self.cql_types** dictionary for **pandans Data Frame**. Store all columns names from table_schema (exectracted from _schema_keyspace_ [inputed table's cols])
        
        ```python
        self.cql_types = {}
        self.cql_columns=[]
        # create dictonary for data frame: column_name:python_datatype
        for row in table_schema: # iterate extracted column_names & data types
            # row contains column_name & type
            # create key as column_name & value as python data type get from cql_to_python_type dictionary
            self.cql_types[row.column_name] = self.cql_to_python_type[row.type]
            # add column name into list
            self.cql_columns.append(row.column_name)
        ```
        Example:
        ```
            # if student table contnains two rows:
            1. id : bigint
            2. name : text
            
            # which converted as
            cql_types = {
                "id":int,
                "name":str
            }
        ```
        8. Create commana seprated string for insert statement
        ``` python
        param_keys = ",".join(self.cql_columns) 
        param_values = ",".join([ '%('+k+')s' for k in self.cql_columns])
        self.insert__statement = "INSERT INTO "+self.table_name+" ("+param_keys+") VALUES ("+param_values+")"
        ```
        Example:
        ```
        if cql_columns = ['id','name']
        then
        param_keys = 'id,name'
        # for bind value as dictionary (json object)
        param_values = '%(id)s,%(name)s'
        # param_values direct bind py execute method.
        ```
        
        9. Read file based on their extension
            - pandas read_csv method accept data frame & chunksize which slice huge file into chunks.
            - supported files are: .csv, .json, .xls, .xlsx
        ``` python
        # supported files
        file_ext = file_name.split(".")[-1] # get extension
        if file_ext=="csv": # if csv then read csv with chuncks
            data_frames = pd.read_csv(file_name,chunksize=chuncks)
        elif file_ext=="json":
            data_frames = pd.read_json(file_name,chunksize=chuncks,lines=True)
        elif file_ext in ["xls","xlsx"]:
            data_frames = pd.read_excel(file_name,chunksize=chuncks)
        else:
            raise Exception("%s is not supported!"%(file_name))
        
        ```
        Example:
        ```
        data__frames has Pandas TextFileReader object 
        ```
        10. Create empty thread list
        ``` python
        threads = list(range(workers))
        # e.g workers = 4
        # threads = [0,1,2,3]
        ```
        11. Start while loop for read data_frames:
         - df = next(data_frames) fetch data frame from TextFileReader, Every loop fetch new frame
         - First for loop assigin thread object into threads[i] position & start thread
         - In the thread class we pass **execute_insert** method and parameters as args = (dataframe,)
         - Second for loop wait for threads[i] complete the execution.
        ```python
        while True:
            try:
                # start thread
                for i in range(workers):
                    # fetch next dataframe
                    df = next(data_frames)
                    threads[i] = threading.Thread(target=self.execute_insert,args=(df,))
                    threads[i].start()
                
                # wait until complete task
                for i in range(workers):
                    threads[i].join()
            # on execption break
            except Exception as e:
                print(e)
                break
        ```
        
        12. Created method **execute_insert**.
         - Accept parameter DataFrame
        ``` python
        def execute_insert(self,df:pd.DataFrame)->None:
        ```
        
        13. In the method:
         -  Make lower case column names. (cql have all lower case columns names)
         - handle null type for numeric values:
           - **df.select_dtypes(include=np.number).columns** get column name which have data type float,int
           - **df.select_dtypes(include=np.number).fillna(0)** select columns & fill 0 if null present
           - **df.select_dtypes(exclude=np.number).fillna('')** select non numeric cols and fill empty string.
         - Iterate df (dataframe) with **session.execute_async** method with insert statement & row(dict|json object).
           - Method **session.execute_async** return future object so result() is required for completed execute.
           - Add **session.execute_async** into **futures list**
         - Second loop iterate each future object & call result() method. By using this loop wait for all **session.execute_async** method to complete execution.
        
        **NOTE: In dataframe Pandas.NaN set by default where null value is located and cql raise exception so we have to fill 0 & empty string**
         ``` python
        # make lower columns name for cql
        df.columns = map(str.lower,df.columns)
        # handle null for numeric type default 0 add
        df[df.select_dtypes(include=np.number).columns] = df.select_dtypes(include=np.number).fillna(0) 
        # handle null for object and string types
        df[df.select_dtypes(exclude=np.number).columns] = df.select_dtypes(exclude=np.number).fillna('') 
        # empty list
        futures = []
        # excute async & store future object
        for row in df.to_dict(orient='records'): # iterate each records
            futures.append(self.session.execute_async(self.insert__statement,row))
        # wait for complete async 
        for future in futures:
            future.result() # wait until insert queries
        ```
        
        # Authors :
        - ### 🙋‍♀️ [Reeya Patel](https://github.com/ReeyaPatel06)
        - ### 🙋‍♂️ [Meet Rathod](https://github.com/MeetRathod0)
        
        
        
Platform: UNKNOWN
Description-Content-Type: text/markdown
