Metadata-Version: 2.1
Name: fast-kafka-api
Version: 0.0.1rc0
Summary: Extension of FastAPI with Kafka event handlers
Home-page: https://github.com/airtai/fast-kafka-api
Author: airt
Author-email: info@airt.ai
License: Apache Software License 2.0
Keywords: nbdev jupyter notebook python
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Natural Language :: English
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: License :: OSI Approved :: Apache Software License
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Provides-Extra: dev
License-File: LICENSE

fast-kafka-api
================

<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

This file will become your README and also the index of your
documentation.

## Install

``` sh
pip install fast_kafka_api
```

## How to use

Fill me in please! Don’t forget code examples:

``` python
from os import environ

title = "Example for FastKafkaAPI"
description = "A simple example on how to use FastKafkaAPI"
version = "0.0.1"
openapi_url = "/openapi.json"
favicon_url = "/assets/images/favicon.ico"

contact = dict(name="airt.ai", url="https://airt.ai", email="info@airt.ai")

kafka_brokers = {
    "localhost": {
        "url": "kafka",
        "description": "local development kafka",
        "port": 9092,
    },
    "staging": {
        "url": "kafka.staging.acme.com",
        "description": "staging kafka",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
    "production": {
        "url": "kafka.acme.com",
        "description": "production kafka",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_server_url = "kafka"
kafka_server_port = "9092"

kafka_config = {
    "bootstrap.servers": f"{kafka_server_url}:{kafka_server_port}",
    "group.id": f"{kafka_server_url}:{kafka_server_port}_group",
    "auto.offset.reset": "earliest",
}
if "KAFKA_API_KEY" in environ:
    kafka_config = {
        **kafka_config,
        **{
            "security.protocol": "SASL_SSL",
            "sasl.mechanisms": "PLAIN",
            "sasl.username": environ["KAFKA_API_KEY"],
            "sasl.password": environ["KAFKA_API_SECRET"],
        },
    }

app = FastKafkaAPI(
    title=title,
    contact=contact,
    kafka_brokers=kafka_brokers,
    kafka_config=kafka_config,
    description=description,
    version=version,
    docs_url=None,
    redoc_url=None,
)
```

``` python
from typing import *
from datetime import datetime
from fast_kafka_api.application import KafkaMessage
from pydantic import NonNegativeInt, Field

class EventData(KafkaMessage):
    definition_id: str = Field(
        ...,
        example="appLaunch",
        description="name of the event",
        min_length=1,
    )
    occurred_time: datetime = Field(
        ...,
        example="2021-03-28T00:34:08",
        description="local time of the event",
    )
    user_id: NonNegativeInt = Field(
        ..., example=12345678, description="ID of a person"
    )
        
class TrainingDataStatus(KafkaMessage):
    no_of_records: NonNegativeInt = Field(
        ...,
        example=12_345,
        description="number of records (rows) ingested",
    )
    total_no_of_records: NonNegativeInt = Field(
        ...,
        example=1_000_000,
        description="total number of records (rows) to be ingested",
    )
        
_total_no_of_records = 0
_no_of_records_received = 0

@app.consumes("training_data")  # type: ignore
async def on_training_data(msg: EventData, produce: Callable[[str, TrainingDataStatus], None]) -> None:
    global _total_no_of_records
    global _no_of_records_received
    _no_of_records_received = _no_of_records_received + 1

    if _no_of_records_received % 100 == 0:
        training_data_status = TrainingDataStatus(
            no_of_records=_no_of_records_received,
            total_no_of_records=_total_no_of_records,
        )
        produce(topic="training_data_status", msg=training_data_status)
```


