Metadata-Version: 2.1
Name: faststream
Version: 0.1.0rc0
Summary: FastStream: the simplest way to work with a messaging queues
Project-URL: Homepage, https://faststream.airt.ai/
Project-URL: Documentation, https://faststream.airt.ai/latest/getting-started/
Project-URL: Tracker, https://github.com/airtai/FastStream/issues
Project-URL: Source, https://github.com/airtai/FastStream
Author-email: airt <info@airt.ai>, Pastukhov Nikita <diementros@yandex.ru>
License-File: LICENSE
Keywords: framework,kafka,message brokers,rabbitmq
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Framework :: AsyncIO
Classifier: Framework :: Pydantic
Classifier: Framework :: Pydantic :: 1
Classifier: Framework :: Pydantic :: 2
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Information Technology
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Topic :: Internet
Classifier: Topic :: Software Development
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.8
Requires-Dist: fast-depends>=2.1.3
Requires-Dist: typer
Requires-Dist: uvloop!=0.15.0,!=0.15.1,>=0.14.0; sys_platform != 'win32' and (sys_platform != 'cygwin' and platform_python_implementation != 'PyPy')
Requires-Dist: watchfiles
Provides-Extra: dev
Requires-Dist: faststream[devdocs,docs,kafka,lint,publish,rabbit,testing]; extra == 'dev'
Provides-Extra: devdocs
Requires-Dist: cairosvg; extra == 'devdocs'
Requires-Dist: mdx-include<2.0.0,>=1.4.1; extra == 'devdocs'
Requires-Dist: mike>=1.1.0; extra == 'devdocs'
Requires-Dist: mkdocs-git-revision-date-localized-plugin>=1.2.0; extra == 'devdocs'
Requires-Dist: mkdocs-glightbox==0.3.4; extra == 'devdocs'
Requires-Dist: mkdocs-literate-nav>=0.6.0; extra == 'devdocs'
Requires-Dist: mkdocs-macros-plugin>=1.0.0; extra == 'devdocs'
Requires-Dist: mkdocs-material>=9.0.0; extra == 'devdocs'
Requires-Dist: mkdocs-static-i18n>=1.0.0; extra == 'devdocs'
Requires-Dist: mkdocstrings[python]>=0.23.0; extra == 'devdocs'
Requires-Dist: pillow; extra == 'devdocs'
Provides-Extra: docs
Requires-Dist: fastapi>=0.100.0; extra == 'docs'
Requires-Dist: uvicorn>=0.17.0; extra == 'docs'
Provides-Extra: kafka
Requires-Dist: aiokafka>=0.8; extra == 'kafka'
Provides-Extra: lint
Requires-Dist: bandit==1.7.5; extra == 'lint'
Requires-Dist: black==23.9.1; extra == 'lint'
Requires-Dist: detect-secrets==1.4.0; extra == 'lint'
Requires-Dist: isort>=5; extra == 'lint'
Requires-Dist: mypy==1.5.1; extra == 'lint'
Requires-Dist: pre-commit==3.4.0; extra == 'lint'
Requires-Dist: pyupgrade-directories; extra == 'lint'
Requires-Dist: ruff==0.0.289; extra == 'lint'
Requires-Dist: semgrep==1.40.0; extra == 'lint'
Requires-Dist: types-pyyaml; extra == 'lint'
Provides-Extra: publish
Requires-Dist: hatch==1.7.0; extra == 'publish'
Provides-Extra: rabbit
Requires-Dist: aio-pika>=9; extra == 'rabbit'
Provides-Extra: test-core
Requires-Dist: coverage[toml]>=7.2; extra == 'test-core'
Requires-Dist: dirty-equals==0.6.0; extra == 'test-core'
Requires-Dist: pytest-asyncio>=0.21; extra == 'test-core'
Requires-Dist: pytest==7.4.2; extra == 'test-core'
Provides-Extra: testing
Requires-Dist: fastapi>=0.100.0b; extra == 'testing'
Requires-Dist: faststream[test-core]; extra == 'testing'
Requires-Dist: httpx; extra == 'testing'
Requires-Dist: pydantic-settings; extra == 'testing'
Requires-Dist: pyyaml; extra == 'testing'
Requires-Dist: requests; extra == 'testing'
Description-Content-Type: text/markdown

[Note]: # (This is an auto-generated file. Please edit docs/docs/en/index.md instead)


FastStream
================

<b>Effortless event stream integration for your services</b>

------------------------------------------------------------------------

<a href="https://github.com/airtai/faststream/actions/workflows/test.yaml" target="_blank">
  <img src="https://github.com/airtai/faststream/actions/workflows/test.yaml/badge.svg?branch=main" alt="Test Passing"/>
</a>
<!-- <a href="https://coverage-badge.samuelcolvin.workers.dev/redirect/airtai/faststream" target="_blank">
    <img src="https://coverage-badge.samuelcolvin.workers.dev/airtai/faststream.svg" alt="Coverage"> -->
</a>
<a href="https://pypi.org/project/faststream" target="_blank">
  <img src="https://img.shields.io/pypi/v/faststream?label=PyPI" alt="Package version">
</a>
<a href="https://www.pepy.tech/projects/faststream" target="_blank">
  <img src="https://static.pepy.tech/personalized-badge/faststream?period=month&units=international_system&left_color=grey&right_color=green&left_text=downloads/month" alt="Downloads"/>
</a>
<a href="https://pypi.org/project/faststream" target="_blank">
  <img src="https://img.shields.io/pypi/pyversions/faststream.svg" alt="Supported Python versions">
</a>
<a href="https://github.com/airtai/faststream/actions/workflows/codeql.yml" target="_blank">
  <img src="https://github.com/airtai/faststream/actions/workflows/codeql.yml/badge.svg" alt="CodeQL">
</a>
<a href="https://github.com/airtai/faststream/actions/workflows/dependency-review.yaml" target="_blank">
  <img src="https://github.com/airtai/faststream/actions/workflows/dependency-review.yaml/badge.svg" alt="Dependency Review">
</a>
<a href="https://github.com/airtai/faststream" target="_blank">
  <img src="https://img.shields.io/github/license/airtai/faststream.png" alt="Github">
</a>

------------------------------------------------------------------------

[FastStream](https://faststream.airt.ai/) is a powerful and easy-to-use Python
library for building asynchronous services that interact with event streams.
 Built on top of [Pydantic](https://docs.pydantic.dev/) and
[AsyncAPI](https://www.asyncapi.com/), FastStream simplifies the process
of writing producers and consumers for message queues, handling all the
parsing, networking, task scheduling and data generation automatically.
With FastStream, you can quickly prototype and develop high-performance
event-based services with minimal code, making it an ideal choice for
developers looking to streamline their workflow and accelerate their
projects.

## History

FastStream is a new package based on the ideas and experiences gained from
[FastKafka](https://github.com/airtai/fastkafka) and
[Propan](https://github.com/lancetnik/propan). By joining our forces, we
 picked up the best from both packages and created the unified way to write
  services capable of processing streamed data regradless of the underliying protocol.

  We'll continue to maintain both packages, but new development will be in this
  project. If you are starting a new service, this package is the recommended way to do it.


#### ⭐⭐⭐ Stay in touch ⭐⭐⭐

Please show your support and stay in touch by:

- giving our [GitHub repository](https://github.com/airtai/faststream/) a
  star, and

- joining our [Discord server](https://discord.gg/CJWmYpyFbc).

Your support helps us to stay in touch with you and encourages us to
continue developing and improving the library. Thank you for your
support!

------------------------------------------------------------------------

<!-- #### 🐝🐝🐝 We are quite busy lately 🐝🐝🐝

![Alt](https://repobeats.axiom.co/api/embed/d2d9164b6bf69bc14af4e6eb47e437b876d0dc0f.svg "Repobeats analytics image")
 -->

## Install

FastStream works on Linux, macOS, Windows and most Unix-style operating systems.
You can install it with `pip` as usual:

``` sh
pip install faststream
```

## Writing app code

Here is an example python app using FastStream that consumes data from a
topic, increments the value, and outputs the data to another topic.

``` python
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class DataBasic(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
    logger.info(msg)
    return DataBasic(data=msg.data + 1.0)
```

### Messages

FastStream uses [Pydantic](https://docs.pydantic.dev/) to parse input
JSON-encoded data into Python objects, making it easy to work with
structured data in your Kafka-based applications. Pydantic’s
[`BaseModel`](https://docs.pydantic.dev/usage/models/) class allows you
to define messages using a declarative syntax, making it easy to specify
the fields and types of your messages.

This example defines one message class for use in a FastStream
application, `Data`.

``` python hl_lines="1 7-10"
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class DataBasic(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )

# Code below omitted 👇
```

<details>
<summary>👀 Full file preview</summary>

``` python
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class DataBasic(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
    logger.info(msg)
    return DataBasic(data=msg.data + 1.0)
```

</details>

These message class will be used to parse and validate incoming data
when consuming and to produce a JSON-encoded message when producing.
Using Pydantic’s BaseModel in combination with FastStream makes it easy
to work with structured data in your Event-based applications.

### Application

This example shows how to initialize a FastStream application.

It starts by initialising a `Broker` object with the address of the Message broker.

Next, an object of the `FastStream` class is created and a `Broker` object is passed to it.

``` python hl_lines="3 4"
# Code above omitted 👆

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

# Code below omitted 👇
```

<details>
<summary>👀 Full file preview</summary>

``` python
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class DataBasic(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
    logger.info(msg)
    return DataBasic(data=msg.data + 1.0)
```

</details>

### Function decorators

FastStream brokers provide convenient function decorators `@broker.subscriber`
and `@broker.publisher` to allow you to delegate the actual process of

- consuming and producing data to Event queues, and

- decoding and encoding JSON encoded messages

from user defined functions to the framework. The FastStream framework
delegates these jobs to AIOKafka and Pydantic libraries.

These decorators make it easy to specify the processing logic for your
consumers and producers, allowing you to focus on the core
business logic of your application without worrying about the underlying
integration.

This following example shows how to use the `@broker.subscriber` and
`@broker.publisher` decorators in a FastStream application:

- The `@broker.subscriber` decorator is applied to the `on_input_data`
  function, which specifies that this function should be called whenever
  a message is received on the “input_data” Kafka topic. The
  `on_input_data` function takes a single argument which is expected to
  be an instance of the `Data` message class. Specifying the type
  of the single argument is instructing the Pydantic to use
  `InputData.parse_raw()` on the consumed message before passing it to
  the user defined function `on_input_data`.

- The `@broker.publisher` decorator is applied also to the `on_input_data` function,
  which specifies that this function should produce a message to the
  “output_data” topic whenever it is called. The `on_input_data`
  function takes the input data and creates a new
  `Data` message with incremented value and then returns it. The
  framework will call the `Data.json().encode("utf-8")` function
  on the returned value and produce it to the specified topic.

``` python hl_lines="17-21"
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class DataBasic(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
    logger.info(msg)
    return DataBasic(data=msg.data + 1.0)
```

### Testing the service

The service can be tested using the `TestBroker` context managers which, by default, puts the Broker into "testing mode".

The Tester will redirect your `subscriber` and `publisher` decorated functions to the InMemory brokers so that you can quickly test your app without the need for a running broker and all its dependencies.

Using pytest, the test for our service would look like this:

``` python
import pytest

from faststream.kafka import TestKafkaBroker

from .basic import DataBasic, broker, on_input_data


@pytest.mark.asyncio
async def test_base_app():
    @broker.subscriber("output_data")
    async def on_output_data(msg: DataBasic):
        pass

    async with TestKafkaBroker(broker):
        await broker.publish(DataBasic(data=0.2), "input_data")

        on_input_data.mock.assert_called_once_with(dict(DataBasic(data=0.2)))

        on_output_data.mock.assert_called_once_with(dict(DataBasic(data=1.2)))
```

First we pass our broker to the `TestKafkaBroker`

``` python hl_lines="3 14"
import pytest

from faststream.kafka import TestKafkaBroker

from .basic import DataBasic, broker, on_input_data


@pytest.mark.asyncio
async def test_base_app():
    @broker.subscriber("output_data")
    async def on_output_data(msg: DataBasic):
        pass

    async with TestKafkaBroker(broker):
        await broker.publish(DataBasic(data=0.2), "input_data")

        on_input_data.mock.assert_called_once_with(dict(DataBasic(data=0.2)))

        on_output_data.mock.assert_called_once_with(dict(DataBasic(data=1.2)))
```

After passing the broker to the `TestKafkaBroker` we can publish an event to "input_data" and check if the tested broker produced a response as a reaction to it.

To check the response, we registered an additional `on_output_data` subscriber which will capture events on "output_data" topic.

``` python hl_lines="10-12 19"
import pytest

from faststream.kafka import TestKafkaBroker

from .basic import DataBasic, broker, on_input_data


@pytest.mark.asyncio
async def test_base_app():
    @broker.subscriber("output_data")
    async def on_output_data(msg: DataBasic):
        pass

    async with TestKafkaBroker(broker):
        await broker.publish(DataBasic(data=0.2), "input_data")

        on_input_data.mock.assert_called_once_with(dict(DataBasic(data=0.2)))

        on_output_data.mock.assert_called_once_with(dict(DataBasic(data=1.2)))
```

## Running the application

The application can be started using builtin FastStream CLI command.

First we will save our application code to `app.py` file. Here is the application code again:

``` python
from pydantic import BaseModel, Field, NonNegativeFloat

from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker


class DataBasic(BaseModel):
    data: NonNegativeFloat = Field(
        ..., examples=[0.5], description="Float data example"
    )


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.publisher("output_data")
@broker.subscriber("input_data")
async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
    logger.info(msg)
    return DataBasic(data=msg.data + 1.0)
```

In order to get all Kafka or RabbitMQ related dependancies, you must install FastStream with the `kafka` or `rabbit` options, respectively:

``` sh
pip install faststream[kafka]
```

To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app simbol to the command.

``` shell
faststream run basic:app
```

After running the command you should see the following output:

``` shell
INFO     - FastStream app starting...
INFO     - input_data |            - `OnInputData` waiting for messages
INFO     - FastStream app started successfully! To exit press CTRL+C
```

## License

FastStream is licensed under the Apache License 2.0

A permissive license whose main conditions require preservation of
copyright and license notices. Contributors provide an express grant of
patent rights. Licensed works, modifications, and larger works may be
distributed under different terms and without source code.

The full text of the license can be found
[here](https://raw.githubusercontent.com/airtai/faststream/main/LICENSE).

## Contributors

Thanks for all of these amazing peoples made the project better!

<a href="https://github.com/airtai/faststream/graphs/contributors">
  <img src="https://contrib.rocks/image?repo=airtai/faststream"/>
</a>
