Metadata-Version: 2.1
Name: faststream
Version: 0.1.0
Summary: FastStream: the simplest way to work with a messaging queues
Project-URL: Homepage, https://faststream.airt.ai/
Project-URL: Documentation, https://faststream.airt.ai/latest/getting-started/
Project-URL: Tracker, https://github.com/airtai/FastStream/issues
Project-URL: Source, https://github.com/airtai/FastStream
Author-email: airt <info@airt.ai>, Pastukhov Nikita <diementros@yandex.ru>
License-File: LICENSE
Keywords: framework,kafka,message brokers,rabbitmq
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Web Environment
Classifier: Framework :: AsyncIO
Classifier: Framework :: Pydantic
Classifier: Framework :: Pydantic :: 1
Classifier: Framework :: Pydantic :: 2
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Information Technology
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Topic :: Internet
Classifier: Topic :: Software Development
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.8
Requires-Dist: fast-depends>=2.1.3
Requires-Dist: typer
Requires-Dist: uvloop!=0.15.0,!=0.15.1,>=0.14.0; sys_platform != 'win32' and (sys_platform != 'cygwin' and platform_python_implementation != 'PyPy')
Requires-Dist: watchfiles
Provides-Extra: dev
Requires-Dist: faststream[devdocs,docs,kafka,lint,publish,rabbit,testing]; extra == 'dev'
Provides-Extra: devdocs
Requires-Dist: cairosvg; extra == 'devdocs'
Requires-Dist: mdx-include<2.0.0,>=1.4.1; extra == 'devdocs'
Requires-Dist: mike>=1.1.0; extra == 'devdocs'
Requires-Dist: mkdocs-git-revision-date-localized-plugin>=1.2.0; extra == 'devdocs'
Requires-Dist: mkdocs-glightbox==0.3.4; extra == 'devdocs'
Requires-Dist: mkdocs-literate-nav>=0.6.0; extra == 'devdocs'
Requires-Dist: mkdocs-macros-plugin>=1.0.0; extra == 'devdocs'
Requires-Dist: mkdocs-material>=9.0.0; extra == 'devdocs'
Requires-Dist: mkdocs-static-i18n>=1.0.0; extra == 'devdocs'
Requires-Dist: mkdocstrings[python]>=0.23.0; extra == 'devdocs'
Requires-Dist: pillow; extra == 'devdocs'
Provides-Extra: docs
Requires-Dist: fastapi>=0.100.0; extra == 'docs'
Requires-Dist: uvicorn>=0.17.0; extra == 'docs'
Provides-Extra: kafka
Requires-Dist: aiokafka>=0.8; extra == 'kafka'
Provides-Extra: lint
Requires-Dist: bandit==1.7.5; extra == 'lint'
Requires-Dist: black==23.9.1; extra == 'lint'
Requires-Dist: detect-secrets==1.4.0; extra == 'lint'
Requires-Dist: isort>=5; extra == 'lint'
Requires-Dist: mypy==1.5.1; extra == 'lint'
Requires-Dist: pre-commit==3.4.0; extra == 'lint'
Requires-Dist: pyupgrade-directories; extra == 'lint'
Requires-Dist: ruff==0.0.289; extra == 'lint'
Requires-Dist: semgrep==1.40.0; extra == 'lint'
Requires-Dist: types-pyyaml; extra == 'lint'
Provides-Extra: publish
Requires-Dist: hatch==1.7.0; extra == 'publish'
Provides-Extra: rabbit
Requires-Dist: aio-pika>=9; extra == 'rabbit'
Provides-Extra: test-core
Requires-Dist: coverage[toml]>=7.2; extra == 'test-core'
Requires-Dist: dirty-equals==0.6.0; extra == 'test-core'
Requires-Dist: pytest-asyncio>=0.21; extra == 'test-core'
Requires-Dist: pytest==7.4.2; extra == 'test-core'
Provides-Extra: testing
Requires-Dist: fastapi>=0.100.0b; extra == 'testing'
Requires-Dist: faststream[test-core]; extra == 'testing'
Requires-Dist: httpx; extra == 'testing'
Requires-Dist: pydantic-settings; extra == 'testing'
Requires-Dist: pyyaml; extra == 'testing'
Requires-Dist: requests; extra == 'testing'
Description-Content-Type: text/markdown

[Note]: # (This is an auto-generated file. Please edit docs/docs/en/index.md instead)

# FastStream

<b>Effortless event stream integration for your services</b>

---

<p align="center">
  <a href="https://github.com/airtai/faststream/actions/workflows/test.yaml" target="_blank">
    <img src="https://github.com/airtai/faststream/actions/workflows/test.yaml/badge.svg?branch=main" alt="Test Passing"/>
  </a>

  <a href="https://coverage-badge.samuelcolvin.workers.dev/redirect/airtai/faststream" target="_blank">
      <img src="https://coverage-badge.samuelcolvin.workers.dev/airtai/faststream.svg" alt="Coverage">
  </a>

  <a href="https://www.pepy.tech/projects/faststream" target="_blank">
    <img src="https://static.pepy.tech/personalized-badge/faststream?period=month&units=international_system&left_color=grey&right_color=green&left_text=downloads/month" alt="Downloads"/>
  </a>

  <a href="https://pypi.org/project/faststream" target="_blank">
    <img src="https://img.shields.io/pypi/v/faststream?label=PyPI" alt="Package version">
  </a>

  <a href="https://pypi.org/project/faststream" target="_blank">
    <img src="https://img.shields.io/pypi/pyversions/faststream.svg" alt="Supported Python versions">
  </a>

  <br/>

  <a href="https://github.com/airtai/faststream/actions/workflows/codeql.yml" target="_blank">
    <img src="https://github.com/airtai/faststream/actions/workflows/codeql.yml/badge.svg" alt="CodeQL">
  </a>

  <a href="https://github.com/airtai/faststream/actions/workflows/dependency-review.yaml" target="_blank">
    <img src="https://github.com/airtai/faststream/actions/workflows/dependency-review.yaml/badge.svg" alt="Dependency Review">
  </a>

  <a href="https://github.com/airtai/faststream/blob/main/LICENSE" target="_blank">
    <img src="https://img.shields.io/github/license/airtai/faststream.png" alt="Github">
  </a>

  <a href="https://discord.gg/CJWmYpyFbc" target="_blank">
      <img alt="Discord" src="https://img.shields.io/discord/1085457301214855171?logo=discord">
  </a>
</p>

---

## Features

[**FastStream**](https://faststream.airt.ai/) simplifies the process of writing producers and consumers for message queues, handling all the
parsing, networking and documentatino generation automatically.

Making streaming microservices has never been easier. Designed with junior developers in mind, **FastStream** simplifies your work while keeping the door open for more advanced use-cases. Here's a look at the core features that make **FastStream** a go-to framework for modern, data-centric microservices.

- **Multiple Brokers**: **FastStream** provides a unified API to work across multiple message brokers (**Kafka**, **RabbitMQ** support)

- [**Pydantic Validation**](#writing-app-code): Leverage [**Pydantic's**](https://docs.pydantic.dev/) validation capabilities to serialize and validates incoming messages

- [**Automatic Docs**](#project-documentation): Stay ahead with automatic [AsyncAPI](https://www.asyncapi.com/) documentation.

- **Intuitive**: full typed editor support makes your development experience smooth, catching errors before they reach runtime

- [**Powerful Dependency Injection System**](#dependencies): Manage your service dependencies efficiently with **FastStream**'s built-in DI system.

- [**Testable**](#testing-the-service): supports in-memory tests, making your CI/CD pipeline faster and more reliable

- **Extendable**: use extensions for lifespans, custom serialization and middlewares

- [**Integrations**](#any-framework): **FastStream** is fully compatible with any HTTP framework you want ([**FastAPI**](#fastapi-plugin) especially)

- **Built for Automatic Code Generation**: **FastStream** is optimized for automatic code generation using advanced models like GPT and Llama

That's **FastStream** in a nutshell—easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, **FastStream** has got you covered.

---

## History

**FastStream** is a new package based on the ideas and experiences gained from [FastKafka](https://github.com/airtai/fastkafka) and [Propan](https://github.com/lancetnik/propan). By joining our forces, we picked up the best from both packages and created the unified way to write services capable of processing streamed data regradless of the underliying protocol. We'll continue to maintain both packages, but new development will be in this project. If you are starting a new service, this package is the recommended way to do it.

---

## Install

**FastStream** works on **Linux**, **macOS**, **Windows** and most **Unix**-style operating systems.
You can install it with `pip` as usual:

```sh
pip install faststream[kafka]
# or
pip install faststream[rabbit]
```

By default **FastStream** uses **PydanticV2** written in **Rust**, but you can downgrade it manually, if your platform has no **Rust** support - **FastStream** will works with the **PydanticV1** correctly as well.

---

## Writing app code

**FastStream** brokers provide convenient function decorators `@broker.subscriber`
and `@broker.publisher` to allow you to delegate the actual process of

- consuming and producing data to Event queues, and

- decoding and encoding JSON encoded messages

These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.

Also, **FastStream** uses [**Pydantic**](https://docs.pydantic.dev/) to parse input
JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize you input messages just using type annotations.

Here is an example python app using **FastStream** that consumes data from an incoming data stream and outputs the data to another one.

```python
from faststream import FastStream
from faststream.kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker

broker = KafkaBroker("localhost:9092")
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("in")
@broker.publisher("out")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"
```

Also, **Pydantic**’s [`BaseModel`](https://docs.pydantic.dev/usage/models/) class allows you
to define messages using a declarative syntax, making it easy to specify the fields and types of your messages.

```python
from pydantic import BaseModel, Field, PositiveInt
from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

class User(BaseModel):
    user: str = Field(..., examples=["John"])
    user_id: PositiveInt = Field(..., examples=["1"])

@broker.subscriber("in")
@broker.publisher("out")
async def handle_msg(data: User) -> str:
    return f"User: {data.user} - {data.user_id} registered"
```

---

## Testing the service

The service can be tested using the `TestBroker` context managers which, by default, puts the Broker into "testing mode".

The Tester will redirect your `subscriber` and `publisher` decorated functions to the InMemory brokers so that you can quickly test your app without the need for a running broker and all its dependencies.

Using pytest, the test for our service would look like this:

```python
# Code above omitted 👆

import pytest
import pydantic
from faststream.kafka import TestKafkaBroker


@pytest.mark.asyncio
async def test_correct():
    async with TestKafkaBroker(broker) as br:
        await br.publish({
            "user": "John",
            "user_id": 1,
        }, "in")

@pytest.mark.asyncio
async def test_invalid():
    async with TestKafkaBroker(broker) as br:
        with pytest.raises(pydantic.ValidationError):
            await br.publish("wrong message", "in")
```

## Running the application

The application can be started using builtin **FastStream** CLI command.

To run the service, use the **FastStream CLI** command and pass the module (in this case, the file where the app implementation is located) and the app simbol to the command.

``` shell
faststream run basic:app
```

After running the command you should see the following output:

``` shell
INFO     - FastStream app starting...
INFO     - input_data |            - `HandleMsg` waiting for messages
INFO     - FastStream app started successfully! To exit press CTRL+C
```

Also, **FastStream** provides you a great hotreload feature to improve your Development Experience

``` shell
faststream run basic:app --reload
```

And multiprocessing horizontal scaling feature as well

``` shell
faststream run basic:app --workers 3
```

You can know more about **CLI** features [here](https://faststream.airt.ai/0.1.0rc0/getting-started/cli/)

---

## Project Documentation

**FastStream** automatically generates documentation for your project according to the [**AsyncAPI**](https://www.asyncapi.com/) specification. You can work with both generated artifacts and place a Web view of your documentation on resources available to related teams.

The availability of such documentation significantly simplifies the integration of services: you can immediately see what channels and message format the application works with. And most importantly, it won't cost anything - **FastStream** has already created the docs for you!

![HTML-page](https://github.com/airtai/faststream/blob/main/docs/docs/assets/img/AsyncAPI-basic-html-short.png)

---

## Dependencies

**FastStream** (thanks to [**FastDepend**](https://lancetnik.github.io/FastDepends/)) has a dependency management system close to `pytest fixtures` and `FastAPI Depends` at the same time. Function arguments declare which dependencies you want are needed, and a special decorator delivers them from the global Context object.

```python linenums="1" hl_lines="9-10"
from faststream import Depends, Logger
async def base_dep(user_id: int) -> bool:
    return True

@broker.subscriber("in-test")
async def base_handler(user: str,
                       logger: Logger,
                       dep: bool = Depends(base_dep)):
    assert dep is True
    logger.info(user)
```

---

## HTTP Frameworks integrations

### Any Framework

You can use **FastStream** `MQBrokers` without `FastStream` application.
Just *start* and *stop* them according to your application lifespan.

```python
from aiohttp import web

from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")

@broker.subscriber("test")
async def base_handler(body):
    print(body)

async def start_broker(app):
    await broker.start()

async def stop_broker(app):
    await broker.close()

async def hello(request):
    return web.Response(text="Hello, world")

app = web.Application()
app.add_routes([web.get("/", hello)])
app.on_startup.append(start_broker)
app.on_cleanup.append(stop_broker)

if __name__ == "__main__":
    web.run_app(app)
```

### **FastAPI** Plugin

Also, **FastStream** can be used as part of **FastAPI**.

Just import a **StreamRouter** you need and declare message handler with the same `@router.subscriber(...)` and `@router.publisher(...)` decorators.

```python
from fastapi import FastAPI
from pydantic import BaseModel

from faststream.kafka.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

class Incoming(BaseModel):
    m: dict

@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming):
    return {"response": "Hello, world!"}

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
```

More integration features can be found [here](https://faststream.airt.ai/0.1.0rc0/getting-started/integrations/fastapi/)

---

## Stay in touch

Please show your support and stay in touch by:

- giving our [GitHub repository](https://github.com/airtai/faststream/) a star, and

- joining our [Discord server](https://discord.gg/CJWmYpyFbc)

Your support helps us to stay in touch with you and encourages us to
continue developing and improving the library. Thank you for your
support!

---

## Contributors

Thanks for all of these amazing peoples made the project better!

<a href="https://github.com/airtai/faststream/graphs/contributors">
  <img src="https://contrib.rocks/image?repo=airtai/faststream"/>
</a>
