Metadata-Version: 2.1
Name: dbsink
Version: 2.6.0
Summary: Tools to sink kafka messages to a database table
Home-page: https://github.com/axiom-data-science/dbsink
Author: Kyle Wilcox
Author-email: kyle@axds.co
License: MIT
Description: ## dbsink
        
        Read from a kafka topic and sink to a database table, one row per message.
        
        This is not unlike the Kafka Connect JdbcConnector. This project has a much lower bar of entry and doesn't require diving into the Kafka Connect ecosystem. I wrote the equivilent to this project using a custom JdbcConnector and it was getting out of control and was basically un-testable. So here we are.
        
        You can choose to unpack the data as `avro`, `msgpack` or the default `json`. `avro` requires an additional `registry` parameter.
        
        Docker images: https://hub.docker.com/r/axiom/dbsink/builds
        
        ## WHY?
        
        I needed to read from well-defined kafka topics and store the results in a database table so collaborators could interact with the data in a more familiar way.
        
        It is also a very convienent and easy to setup PostgREST on top of the resulting tables to get a quick read-only REST API on top of the tabled messages.
        
        ## Mapping messages to tables
        
        You can define custom mappings between messages and tables using a python class. You may register your custom mappings with the `dbsink.maps` entrypoint to have them available to `dbsink` at run-time.
        
        ```python
        entry_points = {
            'dbsink.maps': [
                'YourCustomMap    = you.custom.map.module:CustomMapClass',
                # ...
            ]
        }
        ```
        
        Custom mapping classes should inherit from the `BaseMap` class in `dbsink` and override the following functions as needed:
        
        * `upsert_constraint_name` - Name of the constraint to use for upserting data. Set to to `None` to disable upserting. Use this class property when creating the upsert constraint on your table (see example below).
        
        * `unique_index_name` - Unique index name based on the table name. Use this if defining a single unique index on your table.
        
        * `sequence_name` - Unique sequence name based on the table name. Use this if defining a single sequence column on your table.
        
        * `_check_key` - Checks for validity of a message's `key` before trying to sink. Return `True` if valid and raise an error if not.
        
        * `_check_value` - Checks for validity of a message's `value` before trying to sink. Return `True` if valid and raise an error if not.
        
        * `schema` - A list of SQLAlchmy [Column](https://docs.sqlalchemy.org/en/13/core/metadata.html#sqlalchemy.schema.Column), [Index](https://docs.sqlalchemy.org/en/13/core/constraints.html?highlight=index#sqlalchemy.schema.Index), and [Constraint](https://docs.sqlalchemy.org/en/13/core/constraints.html?highlight=constraint#sqlalchemy.schema.Constraint) schema definitions to use in table creation and updating. This fully describes your table's schema.
        
        * `message_to_values` - A function accepting `key` and `value` arguments and returning a tuple `key, dict` where the dict is the `values` to pass to SQLAlchemy's `insert().values` method. The `value` argument to this function will already be unpacked if `avro` or `msgpack` packing was specified.
        
            ```python
            insert(table).values(
              # dict_returned_ends_up_here
            )
            ```
        
        #### Example
        
        A simple example is the `StringMap` mapping included with `dbsink`
        
        ```python
        from datetime import datetime
        
        import pytz
        import sqlalchemy as sql
        import simplejson as json
        
        from dbsink.maps import BaseMap
        
        
        class StringMap(BaseMap):
        
            @property
            def upsert_constraint_name(self):
                return None  # Ignore upserts
        
            def _check_key(self, key):
                return True  # All keys are valid
        
            def _check_value(self, value):
                # Make sure values are JSON parsable
                _ = json.loads(json.dumps(value, ignore_nan=True))
                return True
        
            @property
            def schema(self):
                return [
                    sql.Column('id',       sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
                    sql.Column('sinked',   sql.DateTime(timezone=True), index=True),
                    sql.Column('key',      sql.String, default='', index=True),
                    sql.Column('payload',  sql.String)
                ]
        
            def message_to_values(self, key, value):
                # Raises if invalid. This calls `._check_key` and `._check_value`
                self.check(key, value)
        
                values = {
                    'sinked':  datetime.utcnow().replace(tzinfo=pytz.utc).isoformat(),
                    'key':     key,
                    'payload': json.dumps(value),
                }
        
                return key, values
        ```
        
        #### Advanced Example
        
        There are no restrictions on table schemas or how you map the message data into the schema. Take this example below that uses a `PostGIS` column.
        
        
        ```python
        from datetime import datetime
        
        import pytz
        import sqlalchemy as sql
        import simplejson as json
        from shapely.geometry import shape
        from geoalchemy2.types import Geography
        
        from dbsink.maps import BaseMap
        
        
        class NamedGenericGeography(BaseMap):
        
            def _check_key(self, key):
                return True  # All keys are valid
        
            def _check_value(self, value):
                # Make sure values are JSON parsable
                _ = json.loads(json.dumps(value, ignore_nan=True))
                return True
        
            @property
            def schema(self):
                return [
                    sql.Column('id',       sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
                    sql.Column('name',     sql.String, default='', index=True),
                    sql.Column('time',     sql.DateTime(timezone=True), index=True),
                    sql.Column('geom',     Geography(srid=4326)),
                    sql.Index(
                        self.unique_index_name,
                        'name',
                        'time',
                        unique=True,
                    ),
                    sql.UniqueConstraint(
                        'name',
                        'time',
                        name=self.upsert_constraint_name
                    )
                ]
        
            def message_to_values(self, key, value):
                """ Assumes a message format of
                {
                  "time": 1000000000, # unix epoch
                  "name": "my cool thing",
                  "geojson": {
                    "geometry": {
                      "type": "Polygon",
                      "coordinates": [ [ [ -118.532116484818843, 32.107425500492766 ], [ -118.457544847012443, 32.107425500492702 ], [ -118.457544847012443, 32.054517056541435 ], [ -118.532116484818872, 32.054517056541464 ], [ -118.532116484818843, 32.107425500492766 ] ] ]
                    }
                  }
                }
                """
                # Raises if invalid
                self.check(key, value)
        
                # GeoJSON `geometry` attribute to WKT
                geometry = shape(value['geojson']['geometry']).wkt
        
                values = {
                    'name': value['name']
                    'time': datetime.fromtimestamp(value['time'], pytz.utc).isoformat()
                    'geom': geometry
                }
        
                return key, values
        ```
        
        
        
        ## Configuration
        
        This program uses [`Click`](https://click.palletsprojects.com/) for the CLI interface. For all options please use the `help`:
        
        ```sh
        $ dbsink --help
        ```
        
        #### Environmental Variables
        
        All configuration options can be specified with environmental variables using the pattern `DBSINK_[argument_name]=[value]`. For more information see [the click documentation](https://click.palletsprojects.com/en/7.x/options/?highlight=auto_envvar_prefix#values-from-environment-variables).
        
        ```bash
        DBSINK_TOPIC="topic-to-listen-to" \
        DBSINK_LOOKUP="StringMap" \
        DBSINK_TABLE="MyCoolTable" \
        DBSINK_CONSUMER="myconsumer" \
        DBSINK_PACKING="msgpack" \
        DBSINK_OFFSET="earlist" \
        DBSINK_DROP="true" \
        DBSINK_VERBOSE="1" \
            dbsink
        ```
        
        ## Testing
        
        You can run the tests using `pytest`. To run the integration tests, start a database with `docker run -p 30300:5432 --name dbsink-int-testing-db -e POSTGRES_USER=sink -e POSTGRES_PASSWORD=sink -e POSTGRES_DB=sink -d mdillon/postgis:11` and run `pytest -m integration`
        
Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Science/Research
Classifier: Operating System :: OS Independent
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Topic :: Scientific/Engineering
Requires-Python: >=3.7
Description-Content-Type: text/markdown
