Metadata-Version: 2.4
Name: qena-shared-lib
Version: 0.1.2
Summary: A shared tools for other services
Requires-Python: >=3.10
Requires-Dist: cronsim~=2.0
Requires-Dist: fastapi[all]~=0.115.0
Requires-Dist: httpx~=0.27.0
Requires-Dist: jwt~=1.3.0
Requires-Dist: passlib[bcrypt]~=1.7.0
Requires-Dist: pika~=1.3.0
Requires-Dist: prometheus-client~=0.21.0
Requires-Dist: prometheus-fastapi-instrumentator~=7.0.0
Requires-Dist: punq~=0.7.0
Requires-Dist: pydantic~=2.10.0
Description-Content-Type: text/markdown

# Qena shared lib

A shared tools for other services. It includes.

- FastAPI app builder
- A wrapper around fastapi to make it class based.
- RabbitMQ utility class to listen, respond, publish and make rpc request.
- Logstash utility class to log message in `ecs` ( elastic common schema ).
- A simple task scheduler, to schedule task to run in specific time.
- Background task runner.
- Security tools ( password hasher, jwt, acl ).
- IOC container to manager dependencies used across fastapi, rabbitmq manager and schedule manager.

# Usage

## Http

To create fastapi app.

``` py
def main() -> FastAPI:
    builder = (
        Builder()
        .with_title("Qena shared lib")
        .with_description("A shared tools for other services.")
        .with_version("0.1.0")
        .with_environment(Environment.PRODUCTION)
    )

    app = builder.build()

    return app
```

To run app

``` sh
$ uvicorn --factory main:main
```

### Lifespan

``` py
@asynccontextmanager
def lifespan(app: FastAPI):
    ...

    yield

    ...


def main() -> FastAPI:
    ...

    builder.with_lifespan(lifespan)

    ...
```

### Dependencies

``` py
def main() -> FastAPI:
    ...

    builder.with_singleton(EmailService)
    builder.with_transient(Database)

    ...
```

### Controllers

``` py
@ApiController("/users")
class UserController(ControllerBase):

    def __init__(self, email_service: EmailService):
        self._email_service = email_service

    @post()
    async def send_email(self, message: str):
        await self._email_service.send(message)


def main() -> FastAPI:
    ...

    builder.with_controllers([
        UserController
    ])
```

### Routers

``` py
router = APIRouter(prefix="/auth")


@router.post("")
async def login(
    db: Annotated[Database, DependsOn(Database)],
    username: str,
    password: str
):
    ...


def main() -> FastAPI:
    ...

    builder.with_routers([
        router
    ])

    ...
```

To enable metrics.

``` py
def main() -> FastAPI:
    ...

    builder.with_metrics()

    ...
```

## Logstash

``` py
@asynccontextmanager
async def lifespan(app: FastAPI):
    logstash = get_service(BaseLogstashSender)

    await logstash.start()

    yield

    await logstash.stop()


def main() -> FastAPI:
    ...

    logstash = HTTPSender(
        service_name="qena-shared-lib",
        url="http://127.0.0.1:18080",
        user="logstash",
        password="logstash",
    )
    # or
    # logstash = TCPSender(
    #   service_name="qena-shared-lib",
    #   host="127.0.0.1",
    #   port=18090
    # )
    builder.with_singleton(
        service=BaseLogstashSender,
        instance=logstash,
    )

    ...


@router.get("")
def log_message(
    logstash: Annotated[
        BaseLogstashSender,
        DependsOn(BaseLogstashSender),
    ],
    message: str,
):
    logstash.info(message)
```

## Rabbitmq

To create rabbitmq connection manager.

``` py
@asynccontextmanager
async def lifespan(app: FastAPI):
    rabbitmq = get_service(RabbitMqManager)

    await rabbitmq.connect()

    yield

    rabbitmq.disconnect()


@Consumer("UserQueue")
class UserConsumer(ListenerBase):

    def __init__(self, db: Database):
        self._db = db

    @consume()
    async def store_user(self, user: User):
        await self._db.save(user)


def main() -> FastAPI:
    ...

    rabbitmq = RabbitMqManager(
        logstash=logstash,
        container=builder.container,
    )

    rabbitmq.include_listener(UserConsumer)
    builder.add_singleton(
        service=RabbitMqManager,
        instance=rabbitmq,
    )

    ...
```

### Publisher

``` py
@router.post("")
async def store_user(
    rabbitmq: Annotated[
        RabbitMqManager,
        DependsOne(RabbitMqManager)
    ],
    user: User,
)
    publisher = rabbitmq.publisher("UserQueue")

    await publisher.publish(user)
    # await publisher.publish_with_arguments(user)
```

### RPC client

``` py
@router.get("")
async def get_user(
    rabbitmq: Annotated[
        RabbitMqManager,
        DependsOne(RabbitMqManager)
    ],
    user_id: str,
)
    rpc_client = rabbitmq.rpc_client("UserQueue")

    user = await rpc_client.call(user_id)
    # user = await rpc_client.call_with_arguments(user_id)

    return user
```

## Scheduler

``` py
@asynccontextmanager
async def lifespan(app: FastAPI):
    schedule_manager = get_service(ScheduleManager)

    rabbitmq.start()

    yield

    schedule_manager.stop()


@Scheduler()
class TaskScheduler(SchedulerBase):

    def __init__(self, db: Database)

    @schedule("* * * * *")
    def do_task(
        self,

    ):
        ...
# or
# scheduler = Scheduler()

# @scheduler.schedule("* * * * *")
# def do_task(
#     db: Annotated[Database, DependsOn(Database)]
# ):
#     ...


def main() -> FastAPI:
    ...

    schedule_manager = ScheduleManager(
        logstash=logstash,
        container=builder.container
    )

    schedule_manager.include_scheduler(TaskScheduler)
    builder.with_singleton(
        service=ScheduleManager,
        instance=schedule_manager,
    )

    ...
```

## Background

``` py
@asynccontextmanager
async def lifespan(app: FastAPI):
    background = get_service(Background)

    background.start()

    yield

    background.stop()


def main() -> FastAPI:
    ...

    builder.with_singleton(
        service=BaseLogstashSender,
        instance=logstash,
    )
    builder.with_singleton(Background)

    ...


async data_processor(data: Data):
    ...


@router.get("")
async def process_data(
    background: Annotated[
        Background,
        DependsOne(Background)
    ],
    data: Data
)
    background.add_task(BackgroundTask(data_processor, data))
```

## Security

### Password hasher

``` py
@ApiController("/users")
class UserController(ControllerBase):

    def __init__(self, password_hasher: PasswordHasher):
        self._password_hasher = password_hasher

    @post()
    async def signup(self, user: User):
        await self._password_hasher.hash(user.password)

    @post()
    async def login(self, user: User):
        await self._password_hasher.verify(user.password)


def main() -> FastAPI:
    ...

    builder.with_singleton(PasswordHasher)
    builder.with_controllers([
        UserController
    ])

    ...
```

### JWT

``` py
@ApiController("/users")
class UserController(ControllerBase):

    def __init__(
        self,

        ...

        jwt: JwtAdapter,
    ):
        ...

        self._jwt = jwt

    @post()
    async def login(self, user: User):
        payload = { ... }

        await self._jwt.encode(payload)

    @post
    async def verifiy(self, token: str):
        await self._jwt.decode(token)


def main() -> FastAPI:
    ...

    builder.with_singleton(JwtAdapter)
    builder.with_controllers([
        UserController
    ])

    ...
```

### ACL

``` py
@ApiController("/users")
class UserController(ControllerBase):

    @post()
    async def get_user(
        self,
        user: Annotated[
            UserInfo,
            EndpointACL(
                user_type="ADMIN",
                persmission=[
                    "READ"
                ],
            )
        ]
    ):
        ...


@router.get("")
async def get_users(
    user: Annotated[
        UserInfo,
        EndpointACL("ADMIN")
    ]
)
    ...
```
