from collections.abc import Mapping
[docs]class QdAnnealingIR(Mapping):
"""量子アニーリング用の中間表現 (QUBO) を表すクラス。
旧 `QuDataInput` で担っていた QUBO 変換・演算機能を移植した。
`qubo` は dict で保持し、キーは変数名のタプル、値は係数。
"""
# ------------------------------
# コンストラクタ & 演算子オーバーロード
# ------------------------------
from typing import Optional # type: ignore
def __init__(self, qubo: Optional[dict] = None):
self.qubo: dict = {}
if qubo is None:
self.qubo = {}
elif isinstance(qubo, dict):
self.qubo = qubo
else:
raise TypeError(f"{type(qubo)}は対応していない型です。")
# 辞書の比較でキー順を無視する際に使うユーティリティ
@staticmethod
def _merge_dict(a: dict, b: dict, op):
"""2つの QUBO dict を merge し、値は op で結合"""
result = a.copy()
for k, v in b.items():
if k in result:
result[k] = op(result[k], v)
else:
result[k] = op(0, v)
return result
def __add__(self, other: "QdAnnealingIR") -> "QdAnnealingIR":
return QdAnnealingIR(
self._merge_dict(self.qubo, other.qubo, lambda x, y: x + y)
)
def __sub__(self, other: "QdAnnealingIR") -> "QdAnnealingIR":
return QdAnnealingIR(
self._merge_dict(self.qubo, other.qubo, lambda x, y: x - y)
)
def __mul__(self, other: "QdAnnealingIR") -> "QdAnnealingIR":
qubo = {}
for k1, v1 in self.qubo.items():
for k2, v2 in other.qubo.items():
key_set = set(k1 + k2)
# 既に存在するkeyがあるか確認
found = False
for _k in qubo.keys():
if key_set == set(_k):
qubo[_k] += v1 * v2
found = True
break
if not found:
if len(key_set) == 1:
var = list(key_set)[0]
qubo[(var, var)] = v1 * v2
else:
qubo[tuple(key_set)] = v1 * v2
return QdAnnealingIR(qubo)
def __pow__(self, other: int) -> "QdAnnealingIR":
if isinstance(other, int):
result = QdAnnealingIR(self.qubo)
for _ in range(1, other):
result = result * self
return result
raise TypeError(f"{type(other)}は対応していない型です。")
# ------------------------------
# 変換 (from_*)
# ------------------------------
# 外部ライブラリ読み込み
from typing import Any, Dict, Optional
import csv, json
import numpy as np
import networkx as nx
import pandas as pd
import sympy
import dimod
from amplify import Poly
from pulp import LpProblem
from pyqubo import Base
[docs] def from_pulp(self, prob: "LpProblem") -> "QdAnnealingIR": # type: ignore[name-defined]
from pulp import LpProblem # local import to avoid heavy dep if未使用
if isinstance(prob, LpProblem):
qubo = {}
for var in prob.objective.to_dict():
qubo[(var["name"], var["name"])] = var["value"]
self.qubo = qubo
return self
raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_amplify(self, prob: "Poly") -> "QdAnnealingIR": # type: ignore[name-defined]
from amplify import Poly # type: ignore
if isinstance(prob, Poly):
variables = prob.variables
qubo: dict = {}
for key, value in prob.as_dict().items():
if len(key) == 0:
continue # 定数
elif len(key) == 1:
qubo[(variables[key[0]].name, variables[key[0]].name)] = value
elif len(key) == 2:
qubo[(variables[key[0]].name, variables[key[1]].name)] = value
else:
raise ValueError("3変数以上は対応していません。")
self.qubo = qubo
return self
raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_pyqubo(self, prob: "Base") -> "QdAnnealingIR": # type: ignore[name-defined]
from pyqubo import Base # type: ignore
if isinstance(prob, Base):
qubo = prob.compile().to_qubo()
self.qubo = qubo[0]
return self
raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_array(self, prob: "np.ndarray") -> "QdAnnealingIR": # type: ignore[name-defined]
import numpy as np # noqa
if isinstance(prob, np.ndarray):
qubo: dict = {}
for i, ai in enumerate(prob):
for j, aij in enumerate(ai):
if aij == 0:
continue
if (f"q_{j}", f"q_{i}") in qubo:
qubo[(f"q_{j}", f"q_{i}")] += aij
else:
qubo[(f"q_{i}", f"q_{j}")] = aij
self.qubo = qubo
return self
raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_csv(self, path: str, encoding: str = "utf-8-sig") -> "QdAnnealingIR":
import csv # local import
try:
with open(path, encoding=encoding, newline="") as f:
qubo: dict = {}
csvreader = csv.reader(f)
for i, ai in enumerate(csvreader):
for j, aij in enumerate(ai):
if float(aij) == 0:
continue
if (f"q_{j}", f"q_{i}") in qubo:
qubo[(f"q_{j}", f"q_{i}")] += float(aij)
else:
qubo[(f"q_{i}", f"q_{j}")] = float(aij)
self.qubo = qubo
return self
except Exception as e:
raise ValueError("読み取りエラー") from e
[docs] def from_json(self, path: str) -> "QdAnnealingIR":
import json # local import
try:
with open(path) as f:
qubo: dict = {}
jd = json.load(f)
for q in jd["qubo"]:
qubo[(q["key"][0], q["key"][1])] = q["value"]
self.qubo = qubo
return self
except Exception as e:
raise ValueError("読み取りエラー") from e
[docs] def from_networkx(self, prob: "nx.Graph") -> "QdAnnealingIR": # type: ignore[name-defined]
import networkx as nx # noqa
if isinstance(prob, nx.Graph):
qubo: dict = {}
for e in prob.edges():
if (f"q_{e[0]}", f"q_{e[1]}") in qubo:
qubo[(f"q_{e[0]}", f"q_{e[1]}")] += 1
else:
qubo[(f"q_{e[0]}", f"q_{e[1]}")] = 1
self.qubo = qubo
return self
raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_pandas(self, prob: "pd.DataFrame") -> "QdAnnealingIR": # type: ignore[name-defined]
import pandas as pd # noqa
if isinstance(prob, pd.DataFrame):
key1_list = prob.columns.tolist()
key2_list = prob.index.tolist()
qubo: dict = {}
for k1 in key1_list:
for k2 in key2_list:
if prob[k1][k2] == 0:
continue
if (k1, k2) in qubo:
qubo[(k1, k2)] += prob[k1][k2]
else:
qubo[(k1, k2)] = prob[k1][k2]
self.qubo = qubo
return self
raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_dimod_bqm(self, prob: "dimod.BinaryQuadraticModel") -> "QdAnnealingIR": # type: ignore[name-defined]
import dimod # noqa
if isinstance(prob, dimod.BinaryQuadraticModel):
qubo = dict(prob.quadratic).copy()
for k, v in prob.linear.items():
if v == 0:
continue
qubo[(k, k)] = v
self.qubo = qubo
return self
raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_sympy(self, prob: "sympy.core.expr.Expr") -> "QdAnnealingIR": # type: ignore[name-defined]
import sympy # noqa
if isinstance(prob, sympy.core.expr.Expr):
qubo: dict = {}
for term in prob.as_ordered_terms():
v, k = term.as_coeff_mul()
if len(k) == 1:
variable = term.free_symbols
qubo[(str(list(variable)[0]), str(list(variable)[0]))] = v
else:
k_tuple = tuple([str(_k) for _k in k])
qubo[k_tuple] = v
self.qubo = qubo
return self
raise TypeError(f"{type(prob)}は対応していない型です。")
# ------------------------------
# 変換 (to_*)
# ------------------------------
[docs] def to_pulp(self):
from pulp import LpVariable, LpProblem, LpMinimize # local import
variables = list(set(k for key in self.qubo.keys() for k in key))
q = [
LpVariable(name, lowBound=0, upBound=1, cat='Binary') for name in variables
]
qubo_prob = LpProblem('QUBO', LpMinimize)
_qubo = 0
for key, value in self.qubo.items():
if key[0] == key[1]:
variable_index = variables.index(key[0])
_qubo += q[variable_index] * value
else:
raise ValueError("pulpは2変数以上に対応していません。")
qubo_prob += _qubo
return qubo_prob
[docs] def to_amplify(self):
from amplify import VariableGenerator # type: ignore
variables = sorted(set(k for key in self.qubo.keys() for k in key))
gen = VariableGenerator()
labeled_q = {
str(name): gen.scalar("Binary", name=str(name)) for name in variables
}
qubo_poly = 0
for key, value in self.qubo.items():
sub_qubo = 1
for k in key:
sub_qubo *= labeled_q[k]
qubo_poly += sub_qubo * value
return qubo_poly
[docs] def to_pyqubo(self):
from pyqubo import Binary # type: ignore
variables = list(set(k for key in self.qubo.keys() for k in key))
q = [Binary(str(variable)) for variable in variables]
qubo_expr = 0
for key, value in self.qubo.items():
sub_qubo = 1
for k in key:
variable_index = variables.index(k)
sub_qubo *= q[variable_index]
qubo_expr += sub_qubo * value
return qubo_expr
[docs] def to_array(self):
import numpy as np # noqa
variables = sorted(list(set(k for key in self.qubo.keys() for k in key)))
qubo_arr = np.zeros((len(variables), len(variables)))
for key, value in self.qubo.items():
if len(key) == 2:
variable_index_0 = variables.index(key[0])
variable_index_1 = variables.index(key[1])
qubo_arr[variable_index_0, variable_index_1] = value
else:
raise ValueError("matrixは3変数以上に対応していません。")
return qubo_arr
[docs] def to_csv(self, name: str = "qudata") -> None:
import csv # noqa
qubo_arr = self.to_array()
try:
with open(f"{name}.csv", 'w') as f:
writer = csv.writer(f)
writer.writerows(qubo_arr)
except Exception as e:
raise ValueError("書き出しエラー") from e
[docs] def to_json(self, name: str = "qudata") -> None:
import json # noqa
qubo_json = [
{"key": list(key), "value": value} for key, value in self.qubo.items()
]
try:
with open(f"{name}.json", 'w') as f:
json.dump({"qubo": qubo_json}, f, indent=4)
except Exception as e:
raise ValueError("書き出しエラー") from e
[docs] def to_networkx(self):
import networkx as nx # noqa
variables = list(set(k for key in self.qubo.keys() for k in key))
G = nx.Graph()
G.add_nodes_from(variables)
for key, value in self.qubo.items():
if len(key) == 2 and key[0] != key[1]:
G.add_edge(key[0], key[1], weight=value)
return G
[docs] def to_pandas(self):
import pandas as pd # noqa
variables = sorted(set(k for key in self.qubo.keys() for k in key))
df = pd.DataFrame(0.0, index=variables, columns=variables)
for key, value in self.qubo.items():
df.loc[key[0], key[1]] = value
return df
[docs] def to_dimod_bqm(self):
import dimod # noqa
linear = {}
quadratic = {}
for key, value in self.qubo.items():
if key[0] == key[1]:
linear[key[0]] = value
else:
quadratic[(key[0], key[1])] = value
return dimod.BinaryQuadraticModel(linear, quadratic, 0.0, vartype='BINARY')
[docs] def to_sympy(self):
import sympy # noqa
expr = 0
for (i, j), value in self.qubo.items():
if i == j:
# 対角項(一次項として扱う)
expr += sympy.Symbol(i) * value
else:
# 非対角項(二次項)
expr += sympy.Symbol(i) * sympy.Symbol(j) * value
return expr
# ------------------------------
# ユーティリティ
# ------------------------------
[docs] def to_dict(self) -> dict:
"""簡易ダンプ"""
return self.qubo
# alias for old name
[docs] @classmethod
def from_dict(cls, data: dict) -> "QdAnnealingIR":
return cls(qubo=data)
# 旧 API との互換性: qubo プロパティを prob にも提供
@property
def prob(self):
return self.qubo
# 比較用
def __eq__(self, other: object) -> bool:
if not isinstance(other, QdAnnealingIR):
return False
return self.qubo == other.qubo
# Mapping インターフェース
def __len__(self):
return len(self.qubo)
def __iter__(self):
return iter(self.qubo)
def __getitem__(self, key):
return self.qubo[key]
QdAnnIR = QdAnnealingIR