Metadata-Version: 2.4
Name: pysnmp-sync-adapter
Version: 1.0.5
Summary: Synchronous wrapper adapters for pysnmp asyncio HLAPI
Author: Ircama
License-Expression: EUPL-1.2
Project-URL: Homepage, https://github.com/Ircama/pysnmp-sync-adapter
Project-URL: Repository, https://github.com/Ircama/pysnmp-sync-adapter
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENCE.txt
Requires-Dist: pysnmp>=5.0.0
Dynamic: license-file

# pysnmp-sync-adapter

[![PyPI](https://img.shields.io/pypi/v/pysnmp-sync-adapter.svg?maxAge=2592000)](https://pypi.org/project/pysnmp-sync-adapter/)
[![PyPI download month](https://img.shields.io/pypi/dm/pysnmp-sync-adapter.svg)](https://pypi.python.org/pypi/pysnmp-sync-adapter/)

**Lightweight Synchronous Adapter for PySNMP AsyncIO HLAPI**

---

This package provides lightweight, blocking wrappers around the `pysnmp.hlapi.v1arch.asyncio` and `pysnmp.hlapi.v3arch.asyncio` modules of PySNMP, enabling synchronous use of the SNMPv1 high-level API in PySNMP v7+ without requiring direct `asyncio` management. It preserves the flexibility of PySNMP’s asyncio-based architecture and includes a compatibility layer for various legacy interfaces. The two additional functions `parallel_get_sync` and `cluster_varbinds` enable efficient, high-performance concurrent SNMP queries in a blocking context.

## Features

- Drop-in synchronous alternatives to PySNMP's async-HLAPI: `get_cmd_sync`, `next_cmd_sync`, `set_cmd_sync`, `bulk_cmd_sync`, `walk_cmd_sync`, `bulk_walk_cmd_sync`.
- Supports both **v1arch** and **v3arch** PySNMP v7+ architectures, automatically selected or configurable via the `PYSNMP_ARCH` environment variable.
- Supports both **IPv4** and **IPv6** transport targets via `UdpTransportTarget` and `Udp6TransportTarget`.
- Reuses or creates the default **shared event loop** (`asyncio.get_event_loop()`), ensuring integration efficiency.
- Sync wrappers accept an optional **`timeout`** parameter (in seconds) that limits the total execution time using `asyncio.wait_for()`.
- Minimizes connection overhead by **reusing pre-created transport instances** when calling `create_transport()`.
- The add-on `parallel_get_sync` function executes multiple SNMP GET requests concurrently in a blocking context. It is complemented by `cluster_varbinds`, an ancillary utility that normalizes and clusters OIDs into ordered chunks for efficient batching. Together, they enable high-throughput querying of large OID sets without requiring asyncio, reducing per-PDU overhead while preserving request order.
- In addition, through the `pysnmp_sync_adapter.legacy_wrappers` compatibility layer, it supports:
  - the [Python SNMP library](https://github.com/pysnmp/pysnmp) v5.0.24 HLAPI
  - the [`etingof/pysnmp`](https://github.com/etingof/pysnmp) v5 HLAPI
- Also, through `the pysnmp_sync_adapter.cmdgen_wrappers` compatibility layer, it runs code written against the `cmdgen` SNMP library appearing in `pysnmp.entity.rfc3413.oneliner`.

These adapters allow to call the familiar HLAPI functions in a purely synchronous style (e.g. in scripts, GUIs like Tkinter, or blocking contexts) without having to manage `asyncio` directly. This restores the synchronous experience familiar from earlier [PySNMP](https://github.com/lextudio/pysnmp) versions. Native sync HLAPI wrappers were [deprecated](https://github.com/lextudio/pysnmp/issues/104) in recent releases in favor of `asyncio`.

### Provided Methods

| Synchronous Function | AsyncIO Equivalent              |
| -------------------- | ------------------------------- |
| `get_cmd_sync`       | `get_cmd`                       |
| `next_cmd_sync`      | `next_cmd`                      |
| `set_cmd_sync`       | `set_cmd`                       |
| `bulk_cmd_sync`      | `bulk_cmd`                      |
| `walk_cmd_sync`      | `walk_cmd` (async-gen)          |
| `bulk_walk_cmd_sync` | `bulk_walk_cmd` (async-gen)     |
| `parallel_get_sync`  | parallel `get_cmd`              |
| `cluster_varbinds`   | Normalize and cluster OID lists |

## Quick Start

Using *v1arch*:

```python
from pysnmp.hlapi.v1arch.asyncio import *
from pysnmp_sync_adapter import get_cmd_sync, create_transport

err, status, index, var_binds = get_cmd_sync(
    SnmpDispatcher(),
    CommunityData('public', mpModel=0),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
)

for name, val in var_binds:
    print(f'{name} = {val}')
```

Using *v3arch*:

```python
from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter import get_cmd_sync, create_transport

err, status, index, var_binds = get_cmd_sync(
    SnmpEngine(),
    CommunityData('public', mpModel=0),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
)

for name, val in var_binds:
    print(f'{name} = {val}')
```

### `parallel_get_sync`

`parallel_get_sync` is an add-on introduced by *pysnmp-sync-adapter* which efficiently dispatch multiple SNMP GET operations in parallel while preserving optimal PDU packing and result ordering, allowing timeout and throttling.

```python
def parallel_get_sync(
    snmp_engine,
    auth_data,
    transport_target,
    *pdu_args,
    queries: list,
    timeout: float = None,
    max_parallel: int = None,
    **pdu_kwargs
) -> list[tuple]:
```

Execute a batch of SNMP GET requests concurrently.

Args:

```
  snmp_engine      – SnmpDispatcher() or SnmpEngine() instance
  auth_data        – CommunityData or UsmUserData
  transport_target – transport returned by create_transport()
  *pdu_args        – positional args before var-binds (e.g. ContextData for v3arch)
  queries          – list of either:
                      • ObjectType(...)  — each sent in its own PDU  
                      • [ObjectType(...), …] — grouped into a single PDU  
  timeout          – optional float, total seconds to wait for all requests
  max_parallel=10, - optional throttle
  **pdu_kwargs     – extra get_cmd() keyword args (e.g. lookupMib=False)
```

Returns a list of `(errorIndication, errorStatus, errorIndex, varBinds)` tuples in the **same order** as the `queries` list.

Behavior:

  1. Single `ObjectType` entries each become one PDU and run in parallel.
  2. Sub-lists of `ObjectType` are packed into one multi-OID PDU each.
  3. All PDUs fire concurrently with timeout and throttle via `asyncio.gather()`.
  4. Results are returned in order, matching `queries`.
  5. Optional `timeout` wraps the entire gather with `asyncio.wait_for()`.
  6. Optional `max_parallel` throttles how many SNMP PDUs are fired in parallel,
     so the agent is not overwhelmed.

Example (v1arch):

```python
from pysnmp.hlapi.v1arch.asyncio import (
    SnmpDispatcher, CommunityData, UdpTransportTarget,
    ObjectType, ObjectIdentity
)
from pysnmp_sync_adapter import create_transport, parallel_get_sync

engine    = SnmpDispatcher()
community = CommunityData("public", mpModel=0)
transport = create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)

queries = [
  ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0")),             # single-OID
  [
    ObjectType(ObjectIdentity("1.3.6.1.2.1.1.3.0")),
    ObjectType(ObjectIdentity("1.3.6.1.2.1.1.5.0"))
  ],                                                           # grouped OIDs
  ObjectType(ObjectIdentity("1.3.6.1.2.1.2.2.1.2.1"))          # another single-OID
]

results = parallel_get_sync(
  engine,
  community,
  transport,
  queries=queries,  # keyword-only argument
  timeout=5,
  lookupMib=False
)

for errInd, errStat, errIdx, varBinds in results:
    if errInd or errStat:
        print("Error:", errInd or errStat)
    else:
        for oid, val in varBinds:
            print(f"{oid} = {val}")
```

Example (v3arch):

```python
from pysnmp.hlapi.v3arch.asyncio import (
    SnmpDispatcher, UsmUserData, ContextData,
    UdpTransportTarget, ObjectType, ObjectIdentity
)
from pysnmp_sync_adapter import create_transport, parallel_get_sync

engine    = SnmpDispatcher()
auth      = UsmUserData("usr", authKey=b"abc", privKey=b"xyz", mpModel=3)
context   = ContextData()
transport = create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)

results = parallel_get_sync(
  engine,
  auth,
  transport,
  context,
  queries=[
    ObjectType(ObjectIdentity("1.3.6.1.2.1.1.1.0")),
    ObjectType(ObjectIdentity("1.3.6.1.2.1.1.3.0"))
  ],
  timeout=5
)
```

### `cluster_varbinds`

`cluster_varbinds` is an ancillary function of that normalizes and clusters queries into flat chunks of up to `max_per_pdu` var-binds per PDU.

It converts a list of OIDs into ordered sublists, each representing one SNMP PDU with no more than max_per_pdu var-binds. This enables efficient batching and parallel dispatch via parallel_get_sync, improving throughput by reducing per-PDU overhead while preserving request order.

Depending on the use case, the performance improvement can be significant.

Example of usage:

```python
raw_queries = [
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
    [
        ObjectType(ObjectIdentity('1.3.6.1.2.1.1.2.0')),
        ObjectType(ObjectIdentity('1.3.6.1.2.1.1.3.0'))
    ],
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.4.0'))
]

wrapped_queries = [
    [ ObjectType(ObjectIdentity(x)) for x in group ]
    for group in raw_queries
]

wrapped_queries = cluster_varbinds(wrapped_queries, max_per_pdu=10)  # this can get 4x performance improvement

raw_results = parallel_get_sync(
    engine,
    auth,
    transport,
    queries=wrapped_queries,
    max_parallel=5
)
```

Description:

```python
def cluster_varbinds(
    queries: Sequence[Union[ObjectType, Sequence[ObjectType]]],
    max_per_pdu: int
) -> List[List[ObjectType]]:
````

**Parameters:**

* `queries` (`Sequence[ObjectType]` or `Sequence[Sequence[ObjectType]]]`):
  A mixed list where each element is either a single `ObjectType` or a list/tuple of them.
* `max_per_pdu` (`int`):
  Maximum number of var-binds to include in each PDU. Must be >= 1.

**Returns:**

* `List[List[ObjectType]]`: A list of flat sub-lists, each containing up to `max_per_pdu` `ObjectType` instances, preserving the original order.

Usage:

```python
from pysnmp.hlapi import ObjectType, ObjectIdentity

# Prepare a mixed sequence of queries
raw_queries = [
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
    [
        ObjectType(ObjectIdentity('1.3.6.1.2.1.1.2.0')),
        ObjectType(ObjectIdentity('1.3.6.1.2.1.1.3.0'))
    ],
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.4.0'))
]

# Chunk into PDUs of max 2 var-binds each
pdus = cluster_varbinds(raw_queries, max_per_pdu=2)
# pdus == [
#   [OT('1.3.6.1.2.1.1.1.0'), OT('1.3.6.1.2.1.1.2.0')],
#   [OT('1.3.6.1.2.1.1.3.0'), OT('1.3.6.1.2.1.1.4.0')]
#]
```

### `create_transport`

`create_transport` synchronously awaits the async `create()` factory on the given transport class `UdpTransportTarget` or `Udp6TransportTarget` and only passes timeout and retries if they are not None, returning a ready-to-use transport object.

```python
def create_transport(
    transport_cls, *addr, timeout=None, retries=None, **other_kwargs
)
```

Example for IPv4:

```python
create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2)
```

Example for IPv6:

```python
create_transport(Udp6TransportTarget, ("2001:db8::1", 161), timeout=2)
```

### Internal Utilities

- `ensure_loop()` — Retrieves the current default event loop via `asyncio.get_event_loop()`, or creates and sets one if none exists. Ensures one loop is available per thread.
- `_sync_coro()` — Executes a coroutine to completion on the shared event loop, with optional timeout support via `asyncio.wait_for()`. Handles already-running loops by scheduling a future.
- `_sync_agen()` — Collects all items from an async generator (e.g., `walk_cmd`) into a list by internally awaiting it with `_sync_coro()`.
- `make_sync()` — Higher-order function that wraps PySNMP async-HLAPI coroutines into synchronous functions, propagating optional `timeout` arguments.

By avoiding per-call event loop instantiation and by reusing transport targets, this implementation significantly reduces runtime overhead in tight polling or query loops.

---

## Installation

```bash
pip install pysnmp-sync-adapter
```

This will automatically install the latest version of `pysnmp` as a dependency, if it is not already present.

## Usage

To ensure compatibility with the selected PySNMP architecture (`v1arch` or `v3arch`), make sure to import `pysnmp.hlapi.v3arch.asyncio` (or `v1arch`) **before** importing from `pysnmp_sync_adapter`. For example:

```python
from pysnmp.hlapi.v3arch.asyncio import *  # Must come first (or v1arch)

from pysnmp_sync_adapter import (
    get_cmd_sync,
    next_cmd_sync,
    set_cmd_sync,
    bulk_cmd_sync,
    walk_cmd_sync,
    bulk_walk_cmd_sync,
    create_transport
)
```

This ensures that the adapter binds to the appropriate internal PySNMP modules. If omitted or imported in the wrong order, `pysnmp_sync_adapter` may fallback to `v1arch` even when `v3arch` is desired.

Alternatively, the environment variable *PYSNMP_ARCH* can be set to *"v3arch"* (or *"v1arch"*). Example:

```python
import os
os.environ["PYSNMP_ARCH"] = "v3arch"  # or "v1arch"

from pysnmp_sync_adapter import get_cmd_sync  # etc.
```

This method is particularly useful in larger applications or testing scenarios where import order might be harder to control.

**Note:** When both `"v1arch"` and `"v3arch"` modules need to be used sequentially in the same program (unusual technique), it has been verified that purging the relevant modules before re-importing them should offer correct behavior:

```python
for mod in list(sys.modules):
    if mod.startswith("pysnmp.hlapi.") or mod.startswith("pysnmp_sync_adapter"):
        del sys.modules[mod]
```

### High-level v1arch sync

```python
import asyncio
import platform
from pysnmp.hlapi.v1arch.asyncio import *
from pysnmp_sync_adapter import (
    get_cmd_sync,
    next_cmd_sync,
    set_cmd_sync,
    bulk_cmd_sync,
    walk_cmd_sync,
    bulk_walk_cmd_sync,
    create_transport
)


if platform.system() == "Windows":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

community = "public"
dispatcher = SnmpDispatcher()
auth_data = CommunityData(community, mpModel=0)

print("\n--> get_cmd_sync")
error_indication, error_status, error_index, var_binds = get_cmd_sync(
    dispatcher,
    auth_data,
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> set_cmd_sync")
error_indication, error_status, error_index, var_binds = set_cmd_sync(
    dispatcher,
    auth_data,
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0), "Linux i386"),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> next_cmd_sync")
error_indication, error_status, error_index, var_binds = next_cmd_sync(
    dispatcher,
    auth_data,
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> bulk_cmd_sync")
error_indication, error_status, error_index, var_binds = bulk_cmd_sync(
    dispatcher,
    CommunityData("public"),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    0,
    2,
    ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> walk_cmd_sync")
objects = walk_cmd_sync(
    dispatcher,
    auth_data,
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
    timeout=30  # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
    for name, val in var_binds:
        print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> bulk_walk_cmd_sync")
objects = bulk_walk_cmd_sync(
    dispatcher,
    CommunityData("public"),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    0,
    25,
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
    timeout=30  # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
    for name, val in var_binds:
        print(name.prettyPrint(), "=", val.prettyPrint())
```

### High-level v3arch sync

```python
import asyncio
import platform
from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter import (
    get_cmd_sync,
    next_cmd_sync,
    set_cmd_sync,
    bulk_cmd_sync,
    walk_cmd_sync,
    bulk_walk_cmd_sync,
    create_transport
)

if platform.system() == "Windows":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

community = "public"
engine = SnmpEngine()

print("\n--> get_cmd_sync")
error_indication, error_status, error_index, var_binds = get_cmd_sync(
    engine,
    CommunityData(community),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> set_cmd_sync")
error_indication, error_status, error_index, var_binds = set_cmd_sync(
    engine,
    CommunityData(community),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0), "Linux i386"),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> next_cmd_sync")
error_indication, error_status, error_index, var_binds = next_cmd_sync(
    engine,
    CommunityData(community),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> bulk_cmd_sync")
error_indication, error_status, error_index, var_binds = bulk_cmd_sync(
    engine,
    CommunityData("public"),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    0,
    2,
    ObjectType(ObjectIdentity("SNMPv2-MIB", "system")),
)
print(error_indication, error_status, error_index)
for name, val in var_binds:
    print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> walk_cmd_sync")
objects = walk_cmd_sync(
    engine,
    CommunityData(community),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
    timeout=30  # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
    for name, val in var_binds:
        print(name.prettyPrint(), "=", val.prettyPrint())

print("\n--> bulk_walk_cmd_sync")
objects = bulk_walk_cmd_sync(
    engine,
    CommunityData("public"),
    create_transport(UdpTransportTarget, ("demo.pysnmp.com", 161), timeout=2),
    ContextData(),
    0,
    25,
    ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr")),
    timeout=30  # Notice that this optional timeout is added to the adapter
)
for error_indication, error_status, error_index, var_binds in objects:
    for name, val in var_binds:
        print(name.prettyPrint(), "=", val.prettyPrint())
```

---------------------------

## Supporting other libraries

This adapter provides compatibility for code written against other libraries using synchronous SNMP commands. It allows legacy SNMPv1/v2c/v3 code to run unchanged for backward compatibility, while taking advantage of simplified synchronous operation.

Implements wrappers:

| Function                  | Description                                                               |
| ------------------------- | ------------------------------------------------------------------------- |
| `getCmd(...)`             | Yields a single `(errInd, errStat, errIdx, varBinds)` from `get_cmd_sync` |
| `setCmd(...)`             | Same as `getCmd` but for `set_cmd_sync`                                   |
| `nextCmd(...)`            | Uses `next_cmd_sync`                                                      |
| `bulkCmd(...)`            | Uses `bulk_cmd_sync`                                                      |
| `walkCmd(...)`            | Uses `walk_cmd_sync`                                                      |
| `bulkWalkCmd(...)`        | Uses `bulk_walk_cmd_sync`                                                 |
| `Udp6TransportTarget(...)`| Legacy-compatible wrapper                                                 |
| `UdpTransportTarget(...)` | Legacy-compatible wrapper                                                 |

These wrappers preserve the iterator-based usage of `pysnmp.hlapi` but operate using blocking, synchronous calls underneath.

### Support of the Python SNMP library v5.0.24 HLAPI

Compatibility layer for code written against the **Python SNMP library** v5 HLAPI (https://github.com/pysnmp/pysnmp) using synchronous SNMP commands.

This library uses `SnmpEngine()` and `ContextData()`. It requires `legacy_wrappers` with `v3arch`.

#### Example Usage

```python
from pysnmp.hlapi.v3arch.asyncio import *
from pysnmp_sync_adapter.legacy_wrappers import UdpTransportTarget, getCmd

for errorIndication, errorStatus, errorIndex, varBinds in getCmd(
    SnmpEngine(),
    CommunityData('public', mpModel=0),
    UdpTransportTarget(("demo.pysnmp.com", 161)),
    ContextData(),
    ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0'))
):
    if errorIndication:
        print(errorIndication, errorStatus, errorIndex, varBinds)
    elif errorStatus:
        print(errorIndication, errorStatus, errorIndex, varBinds)
    else:
        for name, val in varBinds:
            print(name, "=", val)
```

### Support of legacy `etingof/pysnmp` v5 HLAPI

Compatibility layer for code written against the legacy **etingof/pysnmp** v5 HLAPI (https://github.com/etingof/pysnmp) using synchronous SNMP commands.

This library uses `SnmpDispatcher()` and does not use `ContextData()`. It requires `legacy_wrappers` with `v1arch`.

#### Example Usage

```python
from pysnmp.hlapi.v1arch.asyncio import *
from pyasn1.type.univ import OctetString as OctetStringType
from pysnmp_sync_adapter.legacy_wrappers import UdpTransportTarget, getCmd

timeout = 2
retries = 2
iterator = getCmd(
    SnmpDispatcher(),
    CommunityData('public', mpModel=0),
    UdpTransportTarget(
        ("demo.pysnmp.com", 161),
        timeout,  # optional parameter
        retries  # optional parameter
    ),
    ('1.3.6.1.2.1.1.1.0', None)
)

for response in iterator:
    errorIndication, errorStatus, errorIndex, varBinds = response
    if errorIndication:
        print(errorIndication, errorStatus, errorIndex, varBinds)
    elif errorStatus:
        print(errorIndication, errorStatus, errorIndex, varBinds)
    else:
        for varBind in varBinds:
            print(' = '.join([x.prettyPrint() for x in varBind]))
```

#### Notes

##### `UdpTransportTarget`

The adapter supports two legacy initialization forms:

```python
UdpTransportTarget(("host", port), timeout, retries)
UdpTransportTarget(("host", port, timeout, retries))
```

Both forms correctly map to the underlying transport constructor, omitting `timeout` and `retries` if `None`.

##### errorIndication

If the error indication `errorIndication` is present, the returned message is a string.

Besides, if the message includes `"before timeout"`, it will be augmented with `" - timed out"` for compatibility matching.

### Support of the cmdgen SNMP library apearing in pysnmp oneliner

Compatibility layer for code written against the `cmdgen` SNMP library appearing in `pysnmp.entity.rfc3413.oneliner`, using synchronous SNMP commands.

It needs `v3arch`, transparently inserts the required `SnmpEngine()` and
`ContextData()` parameters for SNMPv3 (`v3arch.asyncio`) calls, and wraps OID tuples in
`ObjectType(ObjectIdentity(...))`. Legacy UDP and UDP6 transports, including timeout and retry
arguments, are preserved.

#### Example Usage

```python
from pysnmp.hlapi.v3arch.asyncio import *
import pysnmp_sync_adapter.cmdgen_wrappers as cmdgen
cmd_gen = cmdgen.CommandGenerator()
transport = cmdgen.UdpTransportTarget(("demo.pysnmp.com", 161), timeout=5, retries=1)
oid = '1.3.6.1.2.1.1.1.0'
oid_tuple = tuple(int(part) for part in oid.split('.'))
comm_data = cmdgen.CommunityData('public', mpModel=0)
error_indication, error_status, error_index, var_binds = cmd_gen.getCmd(
    comm_data,
    transport,
    oid_tuple
)
for name, val in var_binds:
    print(f'{name} = {val}')
```

Other example:

```python
from pysnmp.hlapi.v3arch.asyncio import *
import pysnmp_sync_adapter.cmdgen_wrappers as cmdgen
_oids = ('1.3.6.1.2.1.1.1.0', '1.3.6.1.2.1.1.4.0',
         '1.3.6.1.2.1.1.5.0', '1.3.6.1.2.1.1.6.0')
user = 'myUser'
authKe = 'authPassword'
privKe = 'privPassword'
authProto = usmHMACSHAAuthProtocol
privProto = usmAesCfb128Protocol
cmdGen = cmdgen.CommandGenerator()
cmdGen.getCmd(
    cmdgen.UsmUserData(
        user, authKey=authKe, privKey=privKe,
        authProtocol=authProto, privProtocol=privProto
    ),
    cmdgen.UdpTransportTarget(("demo.pysnmp.com", 161)),
    *[_ for _ in (ObjectType(ObjectIdentity(oid)) for oid in _oids)]
)
```

---------------------------

## Notes and limitations

- These adapters block the calling thread until the SNMP operation completes.
- They rely on the default `asyncio` event loop obtained via `asyncio.get_event_loop()`. If no loop is set, one is created and registered. They do not create isolated loops.
- Since PySNMP uses the default event loop bound to the current thread, invoking these synchronous wrappers from a thread that is already running an event loop **may cause deadlocks or `RuntimeError`**. To use them safely in such environments, run them from a separate thread. Hybrid usage of both native async code and sync queries via the *_sync API in the same application is strongly discouraged unless carefully managed, because triggering sync queries (*_sync) while the asyncio event loop is running may cause undefined behavior or deadlocks.
- A **timeout** (in seconds) can be optionally passed to all sync wrappers; it limits the total wall-clock time of the SNMP operation using `asyncio.wait_for()`. On timeout, `asyncio.TimeoutError` is raised.
- The underlying transport layer’s timeouts (e.g. `UdpTransportTarget(..., timeout=2)`) still apply, and should be set appropriately to avoid low-level blocking.
- These wrappers do **not** forcibly cancel low-level transport operations. A timeout interrupts the coroutine, but not the transport at the OS level.

This repository uses the public SNMP simulation service at **`demo.pysnmp.com`**, provided courtesy of [Lextudio](https://docs.lextudio.com/snmp/snmp-simulation-service). Please ensure network access to `demo.pysnmp.com:161` is available when running the tests (`python -m pytest`).

# Reference documentation

* **PySNMP**:
  [https://docs.lextudio.com/snmp/](https://docs.lextudio.com/snmp/)

* **PySNMP API Reference**:
  [https://docs.lextudio.com/pysnmp/v7.1/docs/api-reference](https://docs.lextudio.com/pysnmp/v7.1/docs/api-reference)

## License

EUPL-1.2 License - See [LICENCE](LICENCE.txt) for details.
