File size: 7,895 Bytes
38fd181
da7dbd0
38fd181
 
 
 
 
 
da7dbd0
 
 
 
38fd181
da7dbd0
38fd181
 
da7dbd0
 
38fd181
da7dbd0
 
 
 
 
 
 
 
 
 
 
 
 
38fd181
da7dbd0
 
 
38fd181
 
 
da7dbd0
 
 
38fd181
 
 
 
 
 
 
da7dbd0
38fd181
da7dbd0
 
 
38fd181
da7dbd0
 
 
 
 
 
 
38fd181
da7dbd0
 
 
 
 
 
38fd181
 
 
da7dbd0
 
 
 
 
 
 
 
 
 
 
38fd181
da7dbd0
 
38fd181
 
 
 
da7dbd0
 
38fd181
da7dbd0
 
38fd181
 
 
da7dbd0
38fd181
 
 
da7dbd0
38fd181
 
 
 
da7dbd0
 
 
 
 
38fd181
da7dbd0
 
 
 
 
38fd181
 
 
da7dbd0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38fd181
da7dbd0
 
 
 
 
38fd181
 
 
 
 
 
 
 
 
 
da7dbd0
 
 
 
 
 
 
 
 
 
 
 
 
38fd181
 
da7dbd0
38fd181
 
 
 
da7dbd0
 
38fd181
da7dbd0
38fd181
da7dbd0
 
38fd181
 
da7dbd0
 
 
38fd181
 
da7dbd0
 
 
 
 
 
 
 
 
38fd181
da7dbd0
 
38fd181
 
da7dbd0
38fd181
da7dbd0
 
 
 
 
38fd181
da7dbd0
 
 
 
 
 
 
 
 
38fd181
 
 
 
 
 
 
da7dbd0
38fd181
da7dbd0
38fd181
 
 
 
da7dbd0
38fd181
 
da7dbd0
 
 
 
38fd181
da7dbd0
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import json
import logging
import time
from urllib.parse import (
    quote,
    urlparse,
)

import requests
from bs4 import BeautifulSoup

logging.basicConfig(
    filename="error.log",
    level=logging.INFO,
    format="%(asctime)s | [%(levelname)s]: %(message)s",
    datefmt="%m-%d-%Y / %I:%M:%S %p",
)


class SearchResults:
    def __init__(self, results):
        self.results = results

    def __str__(self):
        output = ""
        for result in self.results:
            output += "---\n"
            output += f"Title: {result.get('title', 'Title not found')}\n"
            output += f"Link: {result.get('link', 'Link not found')}\n"
            output += "---\n"
        return output


class YandexReverseImageSearcher:
    def __init__(self):
        self.base_url = "https://yandex.ru/images/search"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",  # noqa: E501
        }
        self.retry_count = 3
        self.retry_delay = 1

    def response(
        self,
        query: str,
        image_url: str,
        max_results: int = 10,
        delay: int = 1,
    ) -> SearchResults:
        self._validate_input(query, image_url)

        encoded_query = quote(query)
        encoded_image_url = quote(image_url)

        url = f"{self.base_url}?q={encoded_query}&image_url={encoded_image_url}&sbisrc=cr_1_5_2"  # noqa: E501

        all_results = []
        start_index = 0

        while len(all_results) < max_results:
            if start_index != 0:
                time.sleep(delay)

            paginated_url = f"{url}&start={start_index}"

            response = self._make_request(paginated_url)
            if response is None:
                break

            search_results, valid_content = self._parse_search_results(
                response.text,
            )
            if not valid_content:
                logging.warning("Unexpected HTML structure encountered.")
                break

            for result in search_results:
                if len(all_results) >= max_results:
                    break
                data = self._extract_result_data(result)
                if data and data not in all_results:
                    all_results.append(data)

            start_index += len(all_results) - start_index

        if len(all_results) == 0:
            logging.warning(
                f"No results were found for the given query: [{query}], and/or image URL: [{image_url}].",  # noqa: E501
            )
            return "No results found. Please try again with a different query and/or image URL."  # noqa: E501
        else:
            return SearchResults(all_results[:max_results])

    def _validate_input(self, query: str, image_url: str):
        if not query:
            raise ValueError(
                "Query not found. Enter a query and try again.",
            )
        if not image_url:
            raise ValueError(
                "Image URL not found. Enter an image URL and try again.",
            )
        if not self._validate_image_url(image_url):
            raise ValueError(
                "Invalid image URL. Enter a valid image URL and try again.",
            )

    def _validate_image_url(self, url: str) -> bool:
        parsed_url = urlparse(url)
        path = parsed_url.path.lower()
        valid_extensions = (".jpg", ".jpeg", ".png", ".webp")
        return any(path.endswith(ext) for ext in valid_extensions)

    def _make_request(self, url: str):
        attempts = 0
        while attempts < self.retry_count:
            try:
                response = requests.get(url, headers=self.headers)
                if response.headers.get("Content-Type", "").startswith(
                    "text/html",
                ):
                    response.raise_for_status()
                    return response
                else:
                    logging.warning("Non-HTML content received.")
                    return None
            except requests.exceptions.HTTPError as http_err:
                logging.error(f"HTTP error occurred: {http_err}")
                attempts += 1
                time.sleep(self.retry_delay)
            except Exception as err:
                logging.error(f"An error occurred: {err}")
                return None
        return None

    def _parse_search_results(self, html_content: str):
        try:
            soup = BeautifulSoup(html_content, "html.parser")
            return soup.find_all("div", class_="g"), True
        except Exception as e:
            logging.error(f"Error parsing HTML content: {e}")
            return None, False

    def _extract_result_data(self, result):
        link = (
            result.find("a", href=True)["href"]
            if result.find("a", href=True)
            else None
        )
        title = (
            result.find("h3").get_text(strip=True)
            if result.find("h3")
            else None
        )
        return {"link": link, "title": title} if link and title else {}


def get_image_links(page):
    """
    Extracts image URLs from the given HTML page.

    Args:
        page: The HTML content as a string.

    Returns:
        A list of image URLs.
    """
    soup = BeautifulSoup(page, "html.parser")

    # Find the specific section containing image links
    gallery_data = soup.find(
        "div",
        {"class": "cbir-section cbir-section_name_sites"},
    )
    if gallery_data is None:
        return []

    # Find the container of image links
    image_links_container = gallery_data.find("div", {"class": "Root"})
    if image_links_container is None:
        return []

    data_state = json.loads(image_links_container["data-state"])

    # Extract URLs from each div
    image_urls = []
    for site in data_state["sites"]:
        original_image_url = site["originalImage"]["url"]
        image_urls.append(original_image_url)

    return image_urls


def yandex_reverse_image_search(file_path):
    img_search_url = generate_images_search_links(file_path)
    if img_search_url is None:
        return []

    # Simulate a user agent to avoid being blocked
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36",  # noqa: E501
        "Content-Type": "application/json",
    }

    try:
        response = requests.get(img_search_url, headers=headers)
        response.raise_for_status()  # Raise an exception for bad status codes

        # Parse the HTML content
        soup = BeautifulSoup(response.content, "html.parser")
        image_urls = get_image_links(soup.prettify())
        return image_urls

    except requests.exceptions.RequestException as e:
        print(f"Error fetching image: {e}")
        return []


def generate_images_search_links(file_path):
    search_url = "https://yandex.ru/images/search"
    params = {
        "rpt": "imageview",
        "format": "json",
        "request": '{"blocks":[{"block":"b-page_type_search-by-image__link"}]}',  # noqa: E501
    }

    try:
        files = {"upfile": ("blob", open(file_path, "rb"), "image/jpeg/webp")}
        response = requests.post(search_url, params=params, files=files)
        query_string = json.loads(response.content)["blocks"][0]["params"][
            "url"
        ]
        img_search_url = search_url + "?" + query_string
        return img_search_url
    except requests.exceptions as e:
        print(f"Error generating search URL: {e}")
        return None


if __name__ == "__main__":
    file_path = "T:\\Projects\\prj-nict-ai-content-detection\\data\\test_data\\towels.jpg.webp"  # noqa: E501
    image_urls = yandex_reverse_image_search(file_path)
    for image_url in image_urls:
        print(f"Image URL: {image_url}")