# Code repository dump, each file is separated by # ---- file: <name of file>

# Git metadata
# working_dir: /home/lspxl/git/github.com/muka/tap-jsonl
# remote_url: git@github.com-muka:muka/tap-jsonl.git
# remote_name: origin
# branch: detached
# commit: f2f247362fdb170ddbf137b1ddf97c08c1f15c56
# commit_time: 2026-01-02T18:55:35+01:00
# author: muka <luca.capra@gmail.com>
# message: fix: py3.10 support
# dirty: no


# ---- file: README.md
# tap-jsonl

`tap-jsonl` is a Singer tap for generic JSONL extractions.

Built with the [Meltano Tap SDK](https://sdk.meltano.com) for Singer Taps.

<!--

Developer TODO: Update the below as needed to correctly describe the install procedure. For instance, if you do not have a PyPI repo, or if you want users to directly install from your git repo, you can modify this step as appropriate.

## Installation

Install from PyPI:

```bash
uv tool install tap-jsonl
```

Install from GitHub:

```bash
uv tool install git+https://github.com/ORG_NAME/tap-jsonl.git@main
```

-->

## Configuration

### Accepted Config Options

<!--
Developer TODO: Provide a list of config options accepted by the tap.

This section can be created by copy-pasting the CLI output from:

```
tap-jsonl --about --format=markdown
```
-->

Example config

```yaml

      config:
        path: ./data/**/*.jsonl
        stream_name: my_stream
        primary_keys: ["id"]
        encoding: utf-8
        emit_state_every: 500


```


A full list of supported settings and capabilities for this
tap is available by running:

```bash
tap-jsonl --about
```

### Configure using environment variables

This Singer tap will automatically import any environment variables within the working directory's
`.env` if the `--config=ENV` is provided, such that config values will be considered if a matching
environment variable is set either in the terminal context or in the `.env` file.

### Source Authentication and Authorization

<!--
Developer TODO: If your tap requires special access on the source system, or any special authentication requirements, provide those here.
-->

## Usage

You can easily run `tap-jsonl` by itself or in a pipeline using [Meltano](https://meltano.com/).

### Executing the Tap Directly

```bash
tap-jsonl --version
tap-jsonl --help
tap-jsonl --config CONFIG --discover > ./catalog.json
```

## Developer Resources

Follow these instructions to contribute to this project.

### Initialize your Development Environment

Prerequisites:

- Python 3.10+
- [uv](https://docs.astral.sh/uv/)

```bash
uv sync
```

### Create and Run Tests

Create tests within the `tests` subfolder and
then run:

```bash
uv run pytest
```

You can also test the `tap-jsonl` CLI interface directly using `uv run`:

```bash
uv run tap-jsonl --help
```

### Testing with [Meltano](https://www.meltano.com)

_**Note:** This tap will work in any Singer environment and does not require Meltano.
Examples here are for convenience and to streamline end-to-end orchestration scenarios._

<!--
Developer TODO:
Your project comes with a custom `meltano.yml` project file already created. Open the `meltano.yml` and follow any "TODO" items listed in
the file.
-->

Use Meltano to run an EL pipeline:

```bash
# Install meltano
uv tool install meltano

# Test invocation
meltano invoke tap-jsonl --version

# Run a test EL pipeline
meltano run tap-jsonl target-jsonl
```

### SDK Dev Guide

See the [dev guide](https://sdk.meltano.com/en/latest/dev_guide.html) for more instructions on how to use the SDK to
develop your own taps and targets.


# ---- file: meltano.yml
send_anonymous_usage_stats: true
project_id: "tap-jsonl"
default_environment: test
requires_meltano: ">=4.0"
environments:
  - name: test
plugins:
  extractors:
    - name: "tap-jsonl"
      namespace: "tap_jsonl"
      pip_url: -e .
      capabilities:
        - state
        - catalog
        - discover
        - about
        - stream-maps
        - structured-logging

      settings:
        - name: path
        - name: stream_name
        - name: primary_keys
        - name: encoding
        - name: state_strategy
        - name: emit_state_every

      config:
        path: ./data/**/*.jsonl
        stream_name: my_stream
        primary_keys: ["id"]
        encoding: utf-8
        state_strategy: line
        emit_state_every: 500

  loaders:
    - name: target-jsonl
      variant: andyh1203
      pip_url: target-jsonl


# ---- file: pyproject.toml
[project]
name = "tap-jsonl"
version = "0.0.1"
description = "Singer tap for Jsonl, built with the Meltano Singer SDK."
readme = "README.md"
authors = [{ name = "lc", email = "opny721@gmail.com" }]
keywords = [
    "ELT",
    "Jsonl",
]
classifiers = [
    "Intended Audience :: Developers",
    "License :: OSI Approved :: Apache Software License",
    "Operating System :: OS Independent",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Programming Language :: Python :: 3.13",
    "Programming Language :: Python :: 3.14",
]
license = "Apache-2.0"
license-files = [ "LICENSE" ]
requires-python = ">=3.10"
dependencies = [
    "singer-sdk~=0.53.4",
    "typing-extensions>=4.5.0; python_version < '3.13'",
]

[project.optional-dependencies]
s3 = [
    "s3fs~=2025.10.0",
]

[project.scripts]
# CLI declaration
tap-jsonl = 'tap_jsonl.tap:TapJsonl.cli'

[dependency-groups]
dev = [
    { include-group = "test" },
    "python-semantic-release>=9.21.1",
]
test = [
    "pytest>=9",
    "pytest-github-actions-annotate-failures>=0.3",
    "singer-sdk[testing]",
]
typing = [
    "mypy>=1.16.0",
    "ty>=0.0.1-alpha.16",
    "types-requests",
]

[tool.pytest]
minversion = "9.0"
addopts = [
    "--durations=10",
]

[tool.mypy]
warn_unused_configs = true

[tool.ruff]
line-length = 100
required-version = ">=0.14"

[tool.ruff.lint]
future-annotations = true
ignore = [
    "COM812",  # missing-trailing-comma
]
select = ["ALL"]

[tool.ruff.lint.flake8-annotations]
allow-star-arg-any = true

[tool.ruff.lint.pydocstyle]
convention = "google"

[build-system]
requires = [
    "hatchling>=1,<2",
]
build-backend = "hatchling.build"

# This configuration can be used to customize tox tests as well as other test frameworks like flake8 and mypy
[tool.tox]
min_version = "4.22"
requires = [
    "tox>=4.22",
    "tox-uv",
]
env_list = [
    "typing",
    "py314",
    "py313",
    "py312",
    "py311",
    "py310",
]

[tool.tox.env_run_base]
runner = "uv-venv-lock-runner"
pass_env = [
    "GITHUB_*",
    "TAP_JSONL_*",
]
dependency_groups = [ "test" ]
commands = [ [ "pytest", { replace = "posargs", default = [ "tests" ], extend = true } ] ]

[tool.tox.env.typing]
dependency_groups = [ "test", "typing" ]
commands = [
    [ "mypy", { replace = "posargs", default = [ "tap_jsonl", "tests" ], extend = true } ],
    [ "ty", "check", { replace = "posargs", default = [ "tap_jsonl", "tests" ], extend = true } ],
]


[tool.semantic_release]
version_toml = ["pyproject.toml:project.version"]
update_pyproject_toml = true 
commit_parser = "conventional"

build_command = """
    uv lock  --upgrade-package "$PACKAGE_NAME"
    git add uv.lock
    uv build
"""

[tool.semantic_release.commit_parser_options]
minor_tags = ["feat"]
patch_tags = ["fix", "perf"]
parse_squash_commits = true
ignore_merge_commits = true

# ---- file: tap_jsonl/__init__.py
"""Tap for Jsonl."""


# ---- file: tap_jsonl/__main__.py
"""Jsonl entry point."""

from __future__ import annotations

from tap_jsonl.tap import TapJsonl

TapJsonl.cli()


# ---- file: tap_jsonl/client.py
"""Helpers for streaming JSONL files."""

from __future__ import annotations

import json
from dataclasses import dataclass
from typing import Any, Dict, Iterable, Iterator, Optional, Tuple


@dataclass(frozen=True)
class JsonlReadError:
    file_path: str
    line_number: int
    line: str
    error: str


def iter_jsonl_file(
    *,
    file_path: str,
    encoding: str = "utf-8",
    start_line: int = 1,
    logger=None,
) -> Iterator[Tuple[Dict[str, Any], int]]:
    """Yield (record, line_number) from a JSONL file, streaming line-by-line.

    Args:
        file_path: Path to the JSONL file.
        encoding: File encoding.
        start_line: 1-based line number to start from (used for resuming state).
        logger: Optional logger (from Singer SDK) for warnings.

    Yields:
        Tuple(record_dict, current_line_number)
    """
    # Use newline='' for predictable line handling across platforms.
    with open(file_path, "r", encoding=encoding, newline="") as f:
        for i, raw in enumerate(f, start=1):
            if i < start_line:
                continue

            line = raw.strip()
            if not line:
                continue

            try:
                obj = json.loads(line)
            except Exception as e:  # noqa: BLE001
                if logger:
                    logger.warning(
                        "Skipping invalid JSON at %s:%s (%s)",
                        file_path,
                        i,
                        str(e),
                    )
                continue

            # Ensure a dict-like record (Singer expects object records)
            if not isinstance(obj, dict):
                if logger:
                    logger.warning(
                        "Skipping non-object JSON at %s:%s (type=%s)",
                        file_path,
                        i,
                        type(obj).__name__,
                    )
                continue

            yield obj, i


# ---- file: tap_jsonl/streams.py
"""Stream type classes for tap-jsonl."""

from __future__ import annotations

from datetime import datetime, timezone
import os
import re
from pathlib import Path

from singer_sdk import Tap, typing as th
from singer_sdk.streams import Stream
from singer_sdk.helpers.types import Context

import typing as t

from tap_jsonl.client import iter_jsonl_file

SDC_INCREMENTAL_KEY = "_sdc_last_modified"
SDC_FILENAME = "_sdc_filename"
SDC_STREAM = "_sdc_stream"

ISO_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
ISO_DT_RE = re.compile(
    r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})?$"
)


def _looks_like_date(s: str) -> bool:
    return bool(ISO_DATE_RE.match(s))


def _looks_like_datetime(s: str) -> bool:
    return bool(ISO_DT_RE.match(s))


def to_iso8601(dt: datetime) -> str:
    return dt.astimezone(timezone.utc).replace(microsecond=0).isoformat()


def parse_bookmark(val: str | None) -> datetime | None:
    if not val:
        return None
    clean = val.replace("Z", "+00:00")
    return datetime.fromisoformat(clean).astimezone(timezone.utc)


class JsonlFileStream(Stream):
    """A single logical stream backed by one-or-many JSONL files."""

    def __init__(self, tap: Tap) -> None:

        self.stream_name = tap.config.get("stream_name") or "jsonl"

        super().__init__(tap, name=self.stream_name)

        self.primary_keys = self.config.get("primary_keys") or []

        self.state_partitioning_keys = [SDC_FILENAME, SDC_STREAM]
        self.replication_key = SDC_INCREMENTAL_KEY
        self.forced_replication_method = "INCREMENTAL"

        self.file_path = self.config.get("path")
        self._encoding = self.config.get("encoding") or "utf-8"
        self._emit_state_every = int(self.config.get("emit_state_every") or 500)

    @property
    def is_sorted(self) -> bool:
        """The stream returns records in order."""
        return False

    def _infer_schema(self, v: t.Any) -> th.JSONTypeHelper:
        if isinstance(v, bool):
            return th.BooleanType(nullable=True)
        if isinstance(v, int) and not isinstance(v, bool):
            return th.IntegerType(nullable=True)
        if isinstance(v, float):
            return th.NumberType(nullable=True)
        if isinstance(v, str) and _looks_like_datetime(v):
            return th.DateTimeType(nullable=True)
        if isinstance(v, str) and _looks_like_date(v):
            return th.DateType(nullable=True)
        if isinstance(v, str):
            return th.StringType(nullable=True)

        if isinstance(v, dict):
            return th.ObjectType(
                *[th.Property(k, self._infer_schema(vv)) for k, vv in v.items()],
                additional_properties=True,
                nullable=True,
            )

        if isinstance(v, list):
            # infer array item type from first non-null element
            first = next((x for x in v if x is not None), None)
            item = (
                self._infer_schema(first)
                if first is not None
                else th.StringType(nullable=True)
            )
            return th.ArrayType(item, nullable=True)

        return th.StringType(nullable=True)

    @property
    def schema(self) -> dict:
        if self._schema:
            return self._schema

        props = []

        files = self.get_files()
        sample: dict[str, t.Any] | None = None
        for p in files:
            for rec, _ in iter_jsonl_file(
                file_path=str(p), encoding=self._encoding, logger=self.logger
            ):
                sample = rec
                break
            if sample:
                break

        if sample:
            for k, v in sample.items():
                props.append(th.Property(k, self._infer_schema(v)))

        props.append(
            th.Property(
                SDC_INCREMENTAL_KEY,
                th.DateTimeType(nullable=True),
                description="Replication checkpoint (file mtime or row date)",
            )
        )
        props.append(
            th.Property(
                SDC_FILENAME,
                th.StringType(nullable=True),
                description="Filename reference",
            ),
        )
        props.append(
            th.Property(
                SDC_STREAM,
                th.StringType(nullable=True),
                description="Stream (table_name) reference",
            )
        )

        self._schema = th.PropertiesList(*props).to_dict()

        return self._schema

    def get_partition_name(self, filepath: str) -> str:
        return str(Path(filepath).absolute())

    def get_partition_context(self, filepath: str) -> dict[str, t.Any]:
        """Return the one true partition context for this file."""
        return {
            SDC_FILENAME: self.get_partition_name(filepath),
            SDC_STREAM: self.stream_name,
        }

    def get_files(self) -> t.List[Path]:

        pattern = self.config.get("path")
        if not pattern:
            return []

        expanded = os.path.expanduser(pattern)
        # glob with ** support
        paths = sorted(
            Path().glob(expanded)
            if not expanded.startswith("/")
            else Path("/").glob(expanded[1:])
        )

        # Fallback for patterns that Path.glob might not like (Windows-ish or edge cases):
        if not paths:
            import glob as _glob

            paths = [Path(p) for p in sorted(_glob.glob(expanded, recursive=True))]

        return paths

    def get_records(self, context: Context | None) -> t.Iterable[dict]:
        """Yield records for all files matching this stream's glob."""
        files = self.get_files()
        if not files:
            self.logger.warning(f"No files found for {self.file_path}")
            yield from []

        for filepath in sorted(files):
            yield from self.process_file(str(filepath), context or {})

    def process_file(
        self,
        filepath: str,
        context: Context,
    ) -> list[dict]:
        """Process one file with state awareness and return its records."""

        # load bookmark
        last_bookmark = self.get_starting_replication_key_value(context)
        bookmark_dt = parse_bookmark(last_bookmark)

        mtime: datetime = datetime.fromtimestamp(
            Path(filepath).stat().st_mtime, tz=timezone.utc
        ).replace(microsecond=0)

        self.logger.info(
            "Partition context: %s, last_bookmark=%s, mtime=%s",
            context,
            bookmark_dt,
            mtime,
        )

        # skip if already processed
        if bookmark_dt and mtime <= bookmark_dt:
            self.logger.info(
                "Skipping %s (mtime=%s <= bookmark=%s)", filepath, mtime, bookmark_dt
            )
            return []

        records: list[dict] = []

        for record, current_line in iter_jsonl_file(
            file_path=filepath,
            encoding=self._encoding,
            logger=self.logger,
        ):
            record[SDC_INCREMENTAL_KEY] = to_iso8601(mtime)
            record[SDC_FILENAME] = filepath
            record[SDC_STREAM] = self.stream_name

            records.append(record)

        if records:
            self.logger.info("Processed %d rows from %s", len(records), filepath)
            self._increment_stream_state(
                {SDC_INCREMENTAL_KEY: to_iso8601(mtime)},
                context=context,
            )

        return records


# ---- file: tap_jsonl/tap.py
"""Jsonl tap class."""

from __future__ import annotations

import sys

from singer_sdk import Tap
from singer_sdk import typing as th  # JSON schema typing helpers

from tap_jsonl.streams import JsonlFileStream

if sys.version_info >= (3, 12):
    from typing import override
else:
    from typing_extensions import override


class TapJsonl(Tap):
    """Singer tap for JSONL files."""

    name = "tap-jsonl"

    config_jsonschema = th.PropertiesList(
        th.Property(
            "path",
            th.StringType(nullable=False),
            required=True,
            title="Path Glob",
            description="Glob pattern for JSONL files, e.g. /data/**/*.jsonl",
        ),
        th.Property(
            "stream_name",
            th.StringType(nullable=True),
            default="jsonl",
            title="Stream Name",
            description="Override the Singer stream name (default: jsonl).",
        ),
        th.Property(
            "primary_keys",
            th.ArrayType(th.StringType(nullable=False), nullable=True),
            default=[],
            title="Primary Keys",
            description=(
                "Optional list of primary key fields for the stream. "
                "Example: ['id'] or ['project_id','org_id']."
            ),
        ),
        th.Property(
            "encoding",
            th.StringType(nullable=True),
            default="utf-8",
            title="File Encoding",
            description="File encoding used when reading JSONL files (default: utf-8).",
        ),
        th.Property(
            "state_strategy",
            th.StringType(nullable=True),
            default="line",
            title="State Strategy",
            description="Currently only 'line' is supported (track last processed line per file).",
        ),
        th.Property(
            "emit_state_every",
            th.IntegerType(nullable=True),
            default=500,
            title="Emit State Every N Records",
            description="How often to persist state while reading a file (default: 500).",
        ),
    ).to_dict()

    @override
    def discover_streams(self) -> list[JsonlFileStream]:
        return [JsonlFileStream(self)]


if __name__ == "__main__":
    TapJsonl.cli()

