Пришел запрос на парсинг интернет-магазина vseinstrumenti.ru. Магазин является лидером по продажам электроинструмента, садовой техники и строительного оборудования. Нас же будет интересовать группа "Сварочные столы".
Алгоритм наших действий будет следующим:
1. спарсим все ссылки на товары
2. спарсим сами товары и картинки к ним, попутно подготовим данные для создания фильтра в категории на сайте заказчика.
Товаров в категории не много, поэтому особо заморачиваться с автоматизацией не будем.
Я пользуюсь своими готовыми модулями, скрашивающими рутинные задачи в каждом скрипте по сбору данных:
1. csv_handler.py - набор методов по созданию, записи и чтению csv-файлов;
2. get_data_from_website.py - методы запросов к URL и получения HTML, скачивания файлов
3. images_handler.py - обработчик картинок (обрезка, изменение форматов, нанесение водяных знаков).
В нашем скрипте будем пользоваться этой библиотекой, чтобы ускорить разработку.
Основной нюанс, на который нужно обратить внимание в скрипте link_collector.py - как формируется ссылка при переходе на вторую страницу категории. Остальное все стандартно.
from lib.get_data_from_website import GetData
from lib.csv_handler import CsvHandler
from bs4 import BeautifulSoup
import os
import re
links_filename = 'product_links.csv'
def main() -> None:
max_page = 2
csv_file = CsvHandler(links_filename)
if not os.path.isfile(links_filename):
csv_file.create_headers_csv_semicolon(['code', 'link'])
i = 1
while i <= max_page:
if i == 1:
url = 'https://nn.vseinstrumenti.ru/instrument/svarochnoe-oborudovanie/svarochnye-stoly/'
else:
url = f'https://nn.vseinstrumenti.ru/instrument/svarochnoe-oborudovanie/svarochnye-stoly/page{i}/'
html = GetData(url).get_html()
urls = parse_urls(html)
for src in urls:
csv_file.write_to_csv_semicolon(src)
i += 1
def parse_urls(html: str) -> list:
soup = BeautifulSoup(html, 'html.parser')
try:
title = soup.title.string
except:
title = 'Title отсутствует'
print(title)
urls_list = []
product_list = soup.find_all('div', attrs={'class': re.compile('product-tile grid-item|product-tile grid-item -not-available')})
for item in product_list:
try:
link = item.find('div', attrs={'class': 'title'}).find('a', attrs={'class': 'link'}).get('href')
except:
link = None
try:
code = item.find('div', attrs={'class': 'wtis-id'}).find('span').text.strip()
except:
code = None
if link != None:
link = 'https://nn.vseinstrumenti.ru' + link
payload = {'code': code, 'link': link}
urls_list.append(payload)
return urls_list
if __name__ == '__main__':
main()
Напишем скрип парсинга данных на странице товара. Назовем файл parser_vseinstrumenti.py.
Собирать будем заголовок h1, все изображения, описание, технические характеристики, преимущества, производителя, информацию об упаковки, цену.
import bs4
from lib.get_data_from_website import GetData
from lib.csv_handler import CsvHandler
from bs4 import BeautifulSoup
import re
import os
import time
Soup = bs4.BeautifulSoup
def main():
# Готовим файл для записи в него результатов парсинга
result_filename = 'vse_result.csv'
result_file_header = ['code', 'h1', 'price', 'brand', 'description', 'weight', 'length', 'width', 'height',
'table_lenght', 'table_width', 'max_table_load']
csv_result_file = CsvHandler(result_filename)
if not os.path.isfile(result_filename):
csv_result_file.create_headers_csv_semicolon(result_file_header)
# Загружаем ссылки из файла и получаем спаршенные данные
urls = CsvHandler('product_links.csv').read_csv_semicolon()
for item in urls:
try:
html = GetData(item.get('link')).get_html()
data = parse_data(html, item.get('code'))
csv_result_file.write_to_csv_semicolon(data)
except Exception as e:
error = item.get('code') + ';' + item.get('link') + ' ==>> ' + str(e)
write_to_log('error.log', error)
time.sleep(3)
def parse_data(html: str, code: str) -> None:
soup = BeautifulSoup(html, 'html.parser')
# Парсим данные
h1 = get_h1(soup)
price = get_price(soup)
product_description = trash_cleaner(get_description(soup))
product_advantages = trash_cleaner(get_advantages(soup))
brand = get_brand(soup)
specifications = get_specifications(soup)
package = get_package(soup)
get_images(soup, code)
product_properties = generate_filter_properties(specifications)
# Вывод наименования и кода товара для контроля парсинга
print(code, h1, sep=' ==> ')
# Обединяем данные для поля "Подробное описание товара"
text_description = join_product_description(product_description, prepare_specification(specifications), product_advantages)
# Формируем словарь, который будем отправлять в csv
main_properties = {
'code': code if code else None,
'h1': h1 if h1 else None,
'price': price if price else None,
'brand': brand if brand else None,
'description': text_description,
}
dimensions = {
'weight': int(float(replace_comma_with_dot(package.get('weight'))) * 1000) if package.get('weight') else None,
'length': replace_comma_with_dot(package.get('length')) if package.get('length') else None,
'width': replace_comma_with_dot(package.get('width')) if package.get('width') else None,
'height': replace_comma_with_dot(package.get('height')) if package.get('height') else None,
}
payload = main_properties | dimensions | product_properties
return payload
def get_h1(soup: Soup) -> str:
try:
h1 = soup.h1.string.strip()
except:
h1 = None
return h1
def get_price(soup: Soup) -> str:
try:
price = soup.find('div', attrs={'class': 'current-price'}).find('span').text
price = re.findall(r'[0-9]+', price)
price = price[0] + price[1]
except:
price = None
return price
def get_description(soup: Soup) -> str:
try:
description = soup.find('div', attrs={'class': 'content-block'})
product_description = ''.join([str(value) for value in description.children if
len(str(value).strip().replace(';', '').replace('\n', '')) > 0])
except:
product_description = None
return product_description
def get_advantages(soup):
try:
advantages = soup.find('div', attrs={'class': 'advantages spoiler'}).find('div', attrs={'class': 'content-block'})
product_description = ''.join([str(value) for value in advantages.children if
len(str(value).strip().replace(';', '').replace('\n', '')) > 0])
except:
product_description = None
return product_description
def get_brand(soup: Soup) -> str:
try:
brand = soup.find('div', attrs={'class': 'brand'}).find('img').get('alt').strip()
except:
brand = None
return brand
def get_specifications(soup: Soup) -> list:
# Парсинг тех характеристик
try:
data_spec = []
specifications = soup.find('ul', attrs={'class': 'dotted-list'}).contents
for elem in specifications:
if len(elem) != 1:
name = elem.find('span', attrs={'itemprop': 'name'}).text.strip()
value = elem.find('span', attrs={'class': 'value'}).text.strip()
data_spec.append([name, value])
except:
pass
return data_spec
def get_package(soup: Soup) -> dict:
# Парсинг информации об упаковке
try:
package = soup.find_all('ul', attrs={'class': 'unordered-list'})[1].find_all('li')
unit = package[0].string.split(':')[1].strip().lower()
weight = package[1].string.split(':')[1].strip().lower()
length = package[2].string.split(':')[1].strip().lower()
width = package[3].string.split(':')[1].strip().lower()
height = package[4].string.split(':')[1].strip().lower()
except:
unit = None
weight = None
length = None
width = None
height = None
payload = {
'unit': unit,
'weight': weight,
'length': length,
'width': width,
'height': height,
}
return payload
def get_images(soup: Soup, code: str) -> None:
# Парсинг картинок
try:
images = soup.find('div', attrs={'class': 'listing-carousel'}).find_all('img')
i = 1
for img in images:
directory_path = f'images/{code}/'
file_name_extension = img.get('data-src').split('.')[-1]
file_name = code + '-' + str(i) + '.' + file_name_extension
# check_directory(directory_path)
print(file_name)
GetData(img.get('data-src')).download_file(directory_path, file_name)
i += 1
except:
pass
def generate_filter_properties(data: list):
table_lenght = None
table_width = None
max_table_load = None
for prop in data:
if re.findall(r'Длина рабочего стола', prop[0]):
table_lenght = prop[1]
if re.findall(r'Ширина рабочего стола', prop[0]):
table_width = prop[1]
if re.findall(r'Max нагрузка на стол', prop[0]):
max_table_load = prop[1]
payload = {
'table_lenght': table_lenght,
'table_width': table_width,
'max_table_load': max_table_load,
}
return payload
def join_product_description(product_description: str, specification: str, product_advantages: str) -> str:
sku_description_tags = "<h2>Описание</h2>" \
f"{product_description}"
advantages_tags = "<h2>Преимущества</h2>" \
f"{product_advantages}"
if product_description and specification and product_advantages:
return specification + sku_description_tags + advantages_tags
if product_description and specification and product_advantages == None:
return specification + sku_description_tags
if product_description and specification == None and product_advantages == None:
return sku_description_tags
if product_description and specification == None and product_advantages:
return sku_description_tags + product_advantages
if specification and product_description == None and product_advantages == None:
return specification
if specification and product_description == None and product_advantages:
return specification + product_advantages
def prepare_specification(data: list) -> str:
if data:
trs = []
for item in data:
if item[0]:
row = f"<tr>" \
f"<th>{item[0]}</th>" \
f"<td>{item[1]}</td><" \
f"/tr>"
trs.append(row)
table = "<h2>Технические характеристики</h2>" \
"<table><tbody>"
end_table = "</tbody></table>"
if len(trs) > 0:
tbody = ''.join(trs)
result = table + tbody + end_table
return result
result = ''
return result
def trash_cleaner(data: str) -> str:
if data:
data = data.replace(' style="text-align:justify"', '')
data = re.sub(r'\xa0', ' ', data)
data = re.sub('\n|\r', '', data)
return data
def replace_comma_with_dot(data: str) -> str:
if data:
return data.replace(',','.')
def check_directory(directory: str) -> None:
if not os.path.isdir(directory):
os.mkdir(directory)
def write_to_log(filename, data):
with open(filename, 'a', encoding='UTF-8') as f:
f.write(data + '\n')
if __name__ == '__main__':
main()
Результаты работы скрипта можно скачать здесь.