Coverage for src/django_global_search/searcher.py: 95%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-06 22:25 +0000

1"""Global Search - Search logic.""" 

2 

3from __future__ import annotations 

4 

5import time 

6from collections import defaultdict 

7from dataclasses import dataclass 

8from typing import TYPE_CHECKING 

9from urllib.parse import urlencode 

10 

11from django.apps import apps 

12from django.contrib.admin.sites import AdminSite 

13from django.contrib.contenttypes.models import ContentType 

14from django.db.models import Model 

15from django.urls import reverse 

16 

17from django_global_search.admin import GlobalSearchAdminSiteMixin 

18from django_global_search.permissions import filter_searchable_models 

19from django_global_search.settings import GlobalSearchAdminSiteSettings 

20 

21if TYPE_CHECKING: 

22 from django.contrib.admin import ModelAdmin 

23 from django.http import HttpRequest 

24 

25 

26@dataclass(frozen=True) 

27class SearchResultItem: 

28 """Search result item.""" 

29 

30 url: str 

31 display_text: str 

32 

33 

34@dataclass(frozen=True) 

35class ModelSearchResult: 

36 """Search results for a specific model.""" 

37 

38 content_type_id: int 

39 model_name: str 

40 verbose_name: str 

41 verbose_name_plural: str 

42 items: list[SearchResultItem] 

43 has_more: bool 

44 changelist_url: str | None = None 

45 

46 

47@dataclass(frozen=True) 

48class AppSearchResult: 

49 """Search results for an app.""" 

50 

51 app_label: str 

52 app_verbose_name: str 

53 models: list[ModelSearchResult] 

54 

55 

56@dataclass(frozen=True) 

57class GlobalSearchResult: 

58 """Global search result container.""" 

59 

60 apps: list[AppSearchResult] 

61 elapsed_time_ms: int 

62 is_timeout: bool = False 

63 

64 

65class GlobalSearch: 

66 """Global Search class.""" 

67 

68 def __init__(self, admin_site: AdminSite): 

69 """Initialize global search. 

70 

71 :param admin_site: AdminSite instance with GlobalSearchAdminSiteMixin 

72 :raises TypeError: If admin_site doesn't inherit GlobalSearchAdminSiteMixin 

73 """ 

74 if not isinstance(admin_site, GlobalSearchAdminSiteMixin): 

75 raise TypeError( # noqa: TRY003 

76 f"admin_site must inherit GlobalSearchAdminSiteMixin, got {type(admin_site)}" 

77 ) 

78 

79 self.admin_site = admin_site 

80 self.settings: GlobalSearchAdminSiteSettings = admin_site.get_global_search_settings() 

81 

82 def search( 

83 self, 

84 request: HttpRequest, 

85 query: str, 

86 content_type_ids: list[int] | None = None, 

87 ) -> GlobalSearchResult: 

88 """Execute search. 

89 

90 :param request: Request object 

91 :param query: Search query string 

92 :param content_type_ids: Optional list of content type IDs to filter 

93 :raises ValueError: If query is too short 

94 """ 

95 # Validate and normalize query 

96 query = query.strip() 

97 min_query_length = self.settings.min_query_length 

98 if len(query) < min_query_length: 

99 raise ValueError(f"Query must be at least {min_query_length} characters") # noqa: TRY003 

100 

101 start_time = time.perf_counter() 

102 timeout_seconds = self.settings.search_timeout_ms / 1000.0 

103 

104 # Get searchable model admins 

105 model_admins = self.get_searchable_model_admins(request, content_type_ids) 

106 

107 # Group results by app_label 

108 search_results_by_app_label: dict[str, list[ModelSearchResult]] = defaultdict(list) 

109 

110 for model_admin in model_admins: 

111 # Check timeout 

112 elapsed = time.perf_counter() - start_time 

113 if elapsed > timeout_seconds: 

114 # Return empty result on timeout for accuracy 

115 elapsed_ms = int(elapsed * 1000) 

116 return GlobalSearchResult( 

117 apps=[], 

118 elapsed_time_ms=elapsed_ms, 

119 is_timeout=True, 

120 ) 

121 

122 model = model_admin.model 

123 content_type = ContentType.objects.get_for_model(model) 

124 model_search_result = self._search_model(request, model_admin, content_type, query) 

125 

126 if model_search_result: 

127 app_label = model._meta.app_label 

128 search_results_by_app_label[app_label].append(model_search_result) 

129 

130 # Build app results 

131 app_results = [] 

132 for app_label in sorted(search_results_by_app_label.keys()): 

133 models = search_results_by_app_label[app_label] 

134 app_config = apps.get_app_config(app_label) 

135 app_results.append( 

136 AppSearchResult( 

137 app_label=app_label, 

138 app_verbose_name=app_config.verbose_name, 

139 models=models, 

140 ) 

141 ) 

142 

143 elapsed_ms = int((time.perf_counter() - start_time) * 1000) 

144 

145 return GlobalSearchResult( 

146 apps=app_results, 

147 elapsed_time_ms=elapsed_ms, 

148 is_timeout=False, 

149 ) 

150 

151 def get_searchable_model_admins( 

152 self, request: HttpRequest, content_type_ids: list[int] | None = None 

153 ) -> list[ModelAdmin]: 

154 """Get list of searchable ModelAdmin instances.""" 

155 return filter_searchable_models( 

156 request=request, 

157 admin_registry=self.admin_site._registry, 

158 excluded_models=self.settings.excluded_models, 

159 content_type_ids=content_type_ids, 

160 ) 

161 

162 def _search_model( 

163 self, 

164 request: HttpRequest, 

165 model_admin: ModelAdmin, 

166 ct: ContentType, 

167 query: str, 

168 ) -> ModelSearchResult | None: 

169 """Search in a specific model using ModelAdmin's search configuration.""" 

170 model = model_admin.model 

171 

172 # Get base queryset with permissions applied 

173 queryset = model_admin.get_queryset(request) 

174 

175 # Use Django admin's built-in search 

176 queryset, use_distinct = model_admin.get_search_results(request, queryset, query) 

177 

178 # Apply distinct if needed 

179 if use_distinct: 

180 queryset = queryset.distinct() 

181 

182 # Apply ordering for consistent results 

183 ordering = model_admin.get_ordering(request) 

184 if ordering: 

185 queryset = queryset.order_by(*ordering) 

186 

187 # Efficient count+slice: fetch N+1 to check has_more 

188 max_results = self.settings.max_results_per_model 

189 results = list(queryset[: max_results + 1]) 

190 

191 if not results: 

192 return None 

193 

194 has_more = len(results) > max_results 

195 if has_more: 

196 results = results[:max_results] 

197 

198 # Build result items with permission check 

199 result_items = [] 

200 for obj in results: 

201 # Check object level view permission 

202 if not model_admin.has_view_permission(request, obj): 

203 continue 

204 

205 url = self._get_object_url(obj) 

206 display_text = str(obj) 

207 result_items.append(SearchResultItem(url=url, display_text=display_text)) 

208 

209 if not result_items: 

210 return None 

211 

212 # Get changelist URL 

213 changelist_url = self._get_changelist_url(model_admin, query) 

214 

215 return ModelSearchResult( 

216 content_type_id=ct.id, 

217 model_name=model._meta.model_name, 

218 verbose_name=str(model._meta.verbose_name), 

219 verbose_name_plural=str(model._meta.verbose_name_plural), 

220 items=result_items, 

221 has_more=has_more, 

222 changelist_url=changelist_url, 

223 ) 

224 

225 def _get_object_url(self, obj: Model) -> str: 

226 """Get admin change URL for object.""" 

227 admin_site_name = self.admin_site.name 

228 app_label = obj._meta.app_label 

229 model_name = obj._meta.model_name 

230 

231 return reverse( 

232 f"admin:{app_label}_{model_name}_change", 

233 args=[obj.pk], 

234 current_app=admin_site_name, 

235 ) 

236 

237 def _get_changelist_url(self, model_admin: ModelAdmin, query: str) -> str | None: 

238 """Get admin changelist URL with search query.""" 

239 model = model_admin.model 

240 

241 admin_site_name = self.admin_site.name 

242 app_label = model._meta.app_label 

243 model_name = model._meta.model_name 

244 viewname = f"admin:{app_label}_{model_name}_changelist" 

245 

246 try: 

247 base_url = reverse(viewname, current_app=admin_site_name) 

248 except Exception: 

249 return None 

250 else: 

251 query_string = urlencode({"q": query}) 

252 return f"{base_url}?{query_string}"