Coverage for nexios\pagination.py: 81%

198 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-21 20:31 +0100

1import abc 

2import base64 

3import json 

4import urllib.parse 

5from typing import Any, Optional, Dict, List, Tuple, Union 

6 

7 

8class PaginationError(Exception): 

9 """Base class for all pagination errors""" 

10 

11 

12class InvalidPageError(PaginationError): 

13 """Raised when requesting an invalid page number""" 

14 

15 

16class InvalidPageSizeError(PaginationError): 

17 """Raised when requesting an invalid page size""" 

18 

19 

20class InvalidCursorError(PaginationError): 

21 """Raised when providing an invalid cursor""" 

22 

23 

24class LinkBuilder: 

25 def __init__( 

26 self, 

27 base_url: str, 

28 request_params: Dict[str, Union[str, List[str]]], 

29 pagination_params: List[str], 

30 ): 

31 self.base_url = base_url 

32 self.request_params = request_params 

33 self.pagination_params = pagination_params 

34 

35 def build_link(self, new_params: Dict[str, Any]) -> str: 

36 filtered_params: Dict[str, Any] = { 

37 k: v 

38 for k, v in self.request_params.items() 

39 if k not in self.pagination_params 

40 } 

41 merged_params = {**filtered_params, **new_params} 

42 return f"{self.base_url}?{urllib.parse.urlencode(merged_params, doseq=True)}" 

43 

44 

45class BasePaginationStrategy(abc.ABC): 

46 @abc.abstractmethod 

47 def parse_parameters(self, request_params: Dict[str, Any]) -> Any: 

48 pass 

49 

50 @abc.abstractmethod 

51 def calculate_offset_limit(self, *args: List[int]) -> Tuple[int, int]: 

52 pass 

53 

54 @abc.abstractmethod 

55 def generate_metadata( 

56 self, 

57 total_items: int, 

58 items: List[Any], 

59 base_url: str, 

60 request_params: Dict[str, Any], 

61 ) -> Dict[str, Any]: 

62 pass 

63 

64 

65class SyncDataHandler(abc.ABC): 

66 @abc.abstractmethod 

67 def get_total_items(self) -> int: 

68 pass 

69 

70 @abc.abstractmethod 

71 def get_items(self, offset: int, limit: int) -> List[Any]: 

72 pass 

73 

74 

75class SyncListDataHandler(SyncDataHandler): 

76 def __init__(self, data: List[Any]): 

77 self.data = data 

78 

79 def get_total_items(self) -> int: 

80 return len(self.data) 

81 

82 def get_items(self, offset: int, limit: int) -> List[Any]: 

83 return self.data[offset : offset + limit] 

84 

85 

86class AsyncDataHandler(abc.ABC): 

87 @abc.abstractmethod 

88 async def get_total_items(self) -> int: 

89 pass 

90 

91 @abc.abstractmethod 

92 async def get_items(self, offset: int, limit: int) -> List[Any]: 

93 pass 

94 

95 

96class AsyncListDataHandler(AsyncDataHandler): 

97 def __init__(self, data: List[Any]): 

98 self.data = data 

99 

100 async def get_total_items(self) -> int: 

101 return len(self.data) 

102 

103 async def get_items(self, offset: int, limit: int) -> List[Any]: 

104 return self.data[offset : offset + limit] 

105 

106 

107# ==================== PAGINATION STRATEGIES ==================== 

108 

109 

110class PageNumberPagination(BasePaginationStrategy): 

111 def __init__( 

112 self, 

113 page_param: str = "page", 

114 page_size_param: str = "page_size", 

115 default_page: int = 1, 

116 default_page_size: int = 20, 

117 max_page_size: int = 100, 

118 ): 

119 self.page_param = page_param 

120 self.page_size_param = page_size_param 

121 self.default_page = default_page 

122 self.default_page_size = default_page_size 

123 self.max_page_size = max_page_size 

124 

125 def parse_parameters(self, request_params: Dict[str, Any]) -> Tuple[int, int]: 

126 page = int(request_params.get(self.page_param, self.default_page)) 

127 page_size = int( 

128 request_params.get(self.page_size_param, self.default_page_size) 

129 ) 

130 

131 if page_size > self.max_page_size: 

132 page_size = self.max_page_size 

133 

134 if page < 1: 

135 raise InvalidPageError("Page number must be at least 1") 

136 if page_size < 1: 

137 raise InvalidPageSizeError("Page size must be at least 1") 

138 

139 return page, page_size 

140 

141 def calculate_offset_limit(self, page: int, page_size: int) -> Tuple[int, int]: # type: ignore 

142 return (page - 1) * page_size, page_size 

143 

144 def generate_metadata( 

145 self, 

146 total_items: int, 

147 items: List[Any], 

148 base_url: str, 

149 request_params: Dict[str, Any], 

150 ) -> Dict[str, Any]: 

151 page, page_size = self.parse_parameters(request_params) 

152 total_pages = (total_items + page_size - 1) // page_size if page_size else 1 

153 

154 link_builder = LinkBuilder( 

155 base_url, request_params, [self.page_param, self.page_size_param] 

156 ) 

157 

158 links = {} 

159 

160 if page > 1: 

161 links["prev"] = link_builder.build_link( 

162 {self.page_param: page - 1, self.page_size_param: page_size} 

163 ) 

164 if page < total_pages: 

165 links["next"] = link_builder.build_link( 

166 {self.page_param: page + 1, self.page_size_param: page_size} 

167 ) 

168 

169 links["first"] = link_builder.build_link( 

170 {self.page_param: 1, self.page_size_param: page_size} 

171 ) 

172 links["last"] = link_builder.build_link( 

173 {self.page_param: total_pages, self.page_size_param: page_size} 

174 ) 

175 

176 return { 

177 "total_items": total_items, 

178 "total_pages": total_pages, 

179 "page": page, 

180 "page_size": page_size, 

181 "links": links, 

182 } 

183 

184 

185class LimitOffsetPagination(BasePaginationStrategy): 

186 def __init__( 

187 self, 

188 limit_param: str = "limit", 

189 offset_param: str = "offset", 

190 default_limit: int = 20, 

191 max_limit: int = 100, 

192 ): 

193 self.limit_param = limit_param 

194 self.offset_param = offset_param 

195 self.default_limit = default_limit 

196 self.max_limit = max_limit 

197 

198 def parse_parameters(self, request_params: Dict[str, Any]) -> Tuple[int, int]: 

199 limit = int(request_params.get(self.limit_param, self.default_limit)) 

200 offset = int(request_params.get(self.offset_param, 0)) 

201 

202 if limit > self.max_limit: 

203 limit = self.max_limit 

204 if limit < 0: 

205 raise InvalidPageSizeError("Limit cannot be negative") 

206 if offset < 0: 

207 raise InvalidPageError("Offset cannot be negative") 

208 

209 return limit, offset 

210 

211 def calculate_offset_limit(self, limit: int, offset: int) -> Tuple[int, int]: # type: ignore 

212 return offset, limit 

213 

214 def generate_metadata( 

215 self, 

216 total_items: int, 

217 items: List[Any], 

218 base_url: str, 

219 request_params: Dict[str, Any], 

220 ) -> Dict[str, Any]: 

221 limit, offset = self.parse_parameters(request_params) 

222 current_page = (offset // limit) + 1 if limit else 1 

223 total_pages = (total_items + limit - 1) // limit if limit else 1 

224 

225 link_builder = LinkBuilder( 

226 base_url, request_params, [self.limit_param, self.offset_param] 

227 ) 

228 

229 links = {} 

230 

231 if offset > 0: 

232 prev_offset = max(0, offset - limit) 

233 links["prev"] = link_builder.build_link( 

234 {self.limit_param: limit, self.offset_param: prev_offset} 

235 ) 

236 

237 if offset + limit < total_items: 

238 next_offset = offset + limit 

239 links["next"] = link_builder.build_link( 

240 {self.limit_param: limit, self.offset_param: next_offset} 

241 ) 

242 

243 links["first"] = link_builder.build_link( 

244 {self.limit_param: limit, self.offset_param: 0} 

245 ) 

246 links["last"] = link_builder.build_link( 

247 {self.limit_param: limit, self.offset_param: max(0, total_items - limit)} 

248 ) 

249 

250 return { 

251 "total_items": total_items, 

252 "limit": limit, 

253 "offset": offset, 

254 "current_page": current_page, 

255 "total_pages": total_pages, 

256 "links": links, 

257 } 

258 

259 

260class CursorPagination(BasePaginationStrategy): 

261 def __init__( 

262 self, 

263 cursor_param: str = "cursor", 

264 page_size_param: str = "page_size", 

265 default_page_size: int = 20, 

266 max_page_size: int = 100, 

267 sort_field: str = "id", 

268 ): 

269 self.cursor_param = cursor_param 

270 self.page_size_param = page_size_param 

271 self.default_page_size = default_page_size 

272 self.max_page_size = max_page_size 

273 self.sort_field = sort_field 

274 

275 def parse_parameters( 

276 self, request_params: Dict[str, Any] 

277 ) -> Tuple[Optional[str], int]: 

278 cursor = request_params.get(self.cursor_param) 

279 page_size = int( 

280 request_params.get(self.page_size_param, self.default_page_size) 

281 ) 

282 page_size = min(page_size, self.max_page_size) 

283 return cursor, page_size 

284 

285 def decode_cursor(self, cursor: str) -> Dict[str, Any]: 

286 try: 

287 decoded = base64.b64decode(cursor).decode("utf-8") 

288 return json.loads(decoded) 

289 except (json.JSONDecodeError, UnicodeDecodeError): 

290 raise InvalidCursorError("Invalid cursor format") 

291 

292 def encode_cursor(self, value: Any) -> str: 

293 cursor_data = {self.sort_field: value} 

294 return base64.b64encode(json.dumps(cursor_data).encode("utf-8")).decode("utf-8") 

295 

296 def calculate_offset_limit( # type:ignore 

297 self, cursor: Optional[str], page_size: int 

298 ) -> Tuple[int, int]: # type:ignore 

299 return 0, page_size 

300 

301 def generate_metadata( 

302 self, 

303 total_items: int, 

304 items: List[Any], 

305 base_url: str, 

306 request_params: Dict[str, Any], 

307 ) -> Dict[str, Any]: 

308 cursor, page_size = self.parse_parameters(request_params) 

309 link_builder = LinkBuilder( 

310 base_url, request_params, [self.cursor_param, self.page_size_param] 

311 ) 

312 

313 links = {} 

314 next_cursor = self.encode_cursor(items[-1][self.sort_field]) if items else None 

315 prev_cursor = self.encode_cursor(items[0][self.sort_field]) if items else None 

316 

317 if next_cursor: 

318 links["next"] = link_builder.build_link( 

319 {self.cursor_param: next_cursor, self.page_size_param: page_size} 

320 ) 

321 if cursor: 

322 links["prev"] = link_builder.build_link( 

323 {self.cursor_param: prev_cursor, self.page_size_param: page_size} 

324 ) 

325 

326 return { 

327 "total_items": total_items, 

328 "page_size": page_size, 

329 "cursor": cursor, 

330 "links": links, 

331 } 

332 

333 

334# ==================== PAGINATORS ==================== 

335 

336 

337class SyncPaginator: 

338 def __init__( 

339 self, 

340 data_handler: SyncDataHandler, 

341 pagination_strategy: BasePaginationStrategy, 

342 base_url: str, 

343 request_params: Dict[str, Any], 

344 validate_total_items: bool = True, 

345 ): 

346 self.data_handler = data_handler 

347 self.pagination_strategy = pagination_strategy 

348 self.base_url = base_url 

349 self.request_params = request_params 

350 self.validate_total_items = validate_total_items 

351 

352 def paginate(self) -> Dict[str, Any]: 

353 params = self.pagination_strategy.parse_parameters(self.request_params) 

354 offset, limit = self.pagination_strategy.calculate_offset_limit(*params) 

355 

356 total_items = self.data_handler.get_total_items() 

357 if self.validate_total_items and offset >= total_items and total_items > 0: 

358 raise InvalidPageError("Requested offset exceeds total items") 

359 

360 items = self.data_handler.get_items(offset, limit) 

361 metadata = self.pagination_strategy.generate_metadata( 

362 total_items, items, self.base_url, self.request_params 

363 ) 

364 

365 return {"items": items, "pagination": metadata} 

366 

367 

368class AsyncPaginator: 

369 def __init__( 

370 self, 

371 data_handler: AsyncDataHandler, 

372 pagination_strategy: BasePaginationStrategy, 

373 base_url: str, 

374 request_params: Dict[str, Any], 

375 validate_total_items: bool = True, 

376 ): 

377 self.data_handler = data_handler 

378 self.pagination_strategy = pagination_strategy 

379 self.base_url = base_url 

380 self.request_params = request_params 

381 self.validate_total_items = validate_total_items 

382 

383 async def paginate(self) -> Dict[str, Any]: 

384 params = self.pagination_strategy.parse_parameters(self.request_params) 

385 offset, limit = self.pagination_strategy.calculate_offset_limit(*params) 

386 

387 total_items = await self.data_handler.get_total_items() 

388 if self.validate_total_items and offset >= total_items and total_items > 0: 

389 raise InvalidPageError("Requested offset exceeds total items") 

390 

391 items = await self.data_handler.get_items(offset, limit) 

392 metadata = self.pagination_strategy.generate_metadata( 

393 total_items, items, self.base_url, self.request_params 

394 ) 

395 

396 return {"items": items, "pagination": metadata} 

397 

398 

399class PaginatedResponse: 

400 def __init__(self, data: Dict[str, Any]): 

401 self.items = data["items"] 

402 self.metadata = data["pagination"] 

403 

404 def to_dict(self) -> Dict[str, Any]: 

405 return {"data": self.items, "pagination": self.metadata} 

406 

407 

408class AsyncPaginatedResponse: 

409 def __init__(self, data: Dict[str, Any]): 

410 self.items = data["items"] 

411 self.metadata = data["pagination"] 

412 

413 def to_dict(self) -> Dict[str, Any]: 

414 return {"data": self.items, "pagination": self.metadata}