Парсинг базы контактов организаций в 2GIS

Извлечение телефонов и email организаций из онлайн-сервиса

2ГИС по версии Forbes входит в десятку крупнейших интернет-компаний России. На ноябрь 2021 года карты-справочники компании содержали 20,5 тыс. населённых пунктов (из них 790 городов) в 12 странах, что дает возможность автоматизированно извлекать информацию для создания уникальных баз данных контактов компаний.

2ГИС имеет защиту от парсинга, следовательно при сборе данных нам нужно эмулировать поведение реальных людей. Для выполнения подобной задачи остановимся на инструментах Python и Selenium.

Подготовка к парсингу

Традиционно для обхода защиты будем использовать библиотеку Selenium, BeautifulSoup для Python для извлечения данных из кода, базой прокси-серверов, а также собственными классами, которые можно скачать на github.
Вебдрайвер для Chrome браузера требуемой версии можно скачать на официальном сайте.

Алгоритм получения информации реализуем следующий:
  1. в случайном порядке получим прокси и user-agent из файла config.py;
  2. при помощи библиотеки Selenium получим контент (html код);
  3. сохраним код файл;
  4. из файла прочитаем код;
  5. в коде найдем и вырежем словарь с json-данными;
  6. спарсим из с json-данных контакты компании.
Установку виртуального окружения и необходимых библиотек расписывать не будем, так как это стандартные действия.
Файл requrements.txt для установки всех библиотек:
async-generator==1.10
attrs==21.4.0
beautifulsoup4==4.11.1
blinker==1.4
Brotli==1.0.9
certifi==2022.6.15
cffi==1.15.0
charset-normalizer==2.0.12
cryptography==37.0.2
fake-useragent==0.1.11
h11==0.13.0
h2==4.1.0
hpack==4.0.0
hyperframe==6.0.1
idna==3.3
kaitaistruct==0.9
outcome==1.2.0
pyasn1==0.4.8
pycparser==2.21
pyOpenSSL==22.0.0
pyparsing==3.0.9
PySocks==1.7.1
requests==2.28.0
selenium==4.2.0
selenium-wire==4.6.4
sniffio==1.2.0
sortedcontainers==2.4.0
soupsieve==2.3.2.post1
trio==0.21.0
trio-websocket==0.9.2
urllib3==1.26.9
Werkzeug==2.0.3
wsproto==1.1.0
zstandard==0.17.0

Реализация скрипта парсинга 2GIS

Пишем скрипт загрузки кода страницы в файл

Создадим файл data_loader.py, который будет загружать код страницы. Импортируем библиотеки:
import lib.config
import random
import lib.headers

from seleniumwire import webdriver
from selenium.webdriver.chrome.service import Service
Напишем код для случайной выборки прокси и юзер-агента:
def get_headers_proxy() -> dict:
    '''
    The config file must have dict:
        {
            'http_proxy':'http://user:password@ip:port',
            'user-agent': 'user_agent name'
        }
    '''

    try:
        users = lib.config.USER_AGENTS_PROXY_LIST
        persona = random.choice(users)
    except ImportError:
        persona = None
    return persona
Загрузим html-код страницы, используя библиотеку Selenium
def get_html(persona, url):
    options = webdriver.ChromeOptions()
    options.add_argument(f"user-agent={persona['user-agent']}")
    options.add_argument("--disable-blink-features=AutomationControlled")

    options_proxy = {
        'proxy': {
            'https': persona['http_proxy'],
            'no_proxy': 'localhost,127.0.0.1:8080'
        }
    }

    s = Service(executable_path="/lib/chromedriver")

    driver = webdriver.Chrome(options=options, service=s, seleniumwire_options=options_proxy)

    try:
        driver.request_interceptor = lib.headers.interceptor
        driver.set_window_size(1920, 1080)
        driver.get(url)
        html = driver.page_source
    except Exception as e:
        print(e)
        html = None
    finally:
        driver.close()
        driver.quit()
    return html
И допишем недостающие функции скрипта: генерацию имени файла, сохранения в файл, основной функции:
def filename_generator(url):
    return url.split('/')[-2]

def write_to_file(text, filename):
    with open(f"data/{filename}", 'w', encoding='utf-8') as f:
        f.write(text)

def main():
    url = 'https://2gis.ru/moscow/search/%D0%92%D1%85%D0%BE%D0%B4%D0%BD%D1%8B%D0%B5%20%D0%B4%D0%B2%D0%B5%D1%80%D0%B8/rubricId/9972/firm/70000001022846376/37.722346%2C55.61045'
    persona = get_headers_proxy()
    html = get_html(persona, url)
    write_to_file(html, filename_generator(url))
Итоговый скрипт будет таким:
import lib.config
import random
import lib.headers

from seleniumwire import webdriver
from selenium.webdriver.chrome.service import Service

def get_headers_proxy() -> dict:
    '''
    The config file must have dict:
        {
            'http_proxy':'http://user:password@ip:port',
            'user-agent': 'user_agent name'
        }
    '''

    try:
        users = lib.config.USER_AGENTS_PROXY_LIST
        persona = random.choice(users)
    except ImportError:
        persona = None
    return persona

def get_html(persona: dict, url: str) -> str:
    options = webdriver.ChromeOptions()
    options.add_argument(f"user-agent={persona['user-agent']}")
    options.add_argument("--disable-blink-features=AutomationControlled")

    options_proxy = {
        'proxy': {
            'https': persona['http_proxy'],
            'no_proxy': 'localhost,127.0.0.1:8080'
        }
    }

    s = Service(executable_path="/Users/noy/Documents/dev/PYTHON/ozon_live/lib/chromedriver")

    driver = webdriver.Chrome(options=options, service=s, seleniumwire_options=options_proxy)

    try:
        driver.request_interceptor = lib.headers.interceptor
        driver.set_window_size(1920, 1080)
        driver.get(url)
        html = driver.page_source
    except Exception as e:
        print(e)
        html = None
    finally:
        driver.close()
        driver.quit()
    return html

def filename_generator(url: str) -> str:
    return url.split('/')[-2]

def write_to_file(text: str, filename: str):
    with open(f"data/{filename}", 'w', encoding='utf-8') as f:
        f.write(text)

def main():
    url = 'https://2gis.ru/moscow/search/%D0%92%D1%85%D0%BE%D0%B4%D0%BD%D1%8B%D0%B5%20%D0%B4%D0%B2%D0%B5%D1%80%D0%B8/rubricId/9972/firm/70000001022846376/37.722346%2C55.61045'
    persona = get_headers_proxy()
    html = get_html(persona, url)
    write_to_file(html, filename_generator(url))


if __name__ == '__main__':
    main()

Пишем скрипт скрипт извлечения данных

Создадим файл contacts_parser.py. И импортируем необходимые библиотеки:
import re
import glob
import json
from bs4 import BeautifulSoup
from pprint import pprint
Получим все пути к загруженным данным и извлечем из html кусок json-кода с контактами компании:
def load_data(source):
    with open(source, 'r', encoding='utf-8') as f:
        return f.read()

def get_json_data(html):
    soup = BeautifulSoup(html, 'html.parser')
    scripts = soup.find_all('script', string=re.compile("__customcfg"))
    data = re.findall(r'var initialState = JSON\.parse\(.*?__REACT_QUERY_STATE__', str(scripts[0]))
    return json.loads(data[0][31:-38])
Для примера, будем собирать название организации, адрес сайта, телефоны, адреса электронных почт, группы в вконтакте и одноклассниках, адрес канала на Youtube, Pinterest. Спарсим контактные данные из json и распечатаем в консоли:
def get_company_data(data: dict, filename: str):
    company_name = data.get('data').get('entity').get('profile').get(filename).get('data').get('org').get('name')
    contacts = data.get('data').get('entity').get('profile').get(filename).get('data').get('contact_groups')
    tel_list = []
    email_list = []
    wedsite_list = []
    vk_list = []
    ok_list = []
    youtube_list = []
    pinterest_list = []
    i = 0
    while i < len(contacts):
        for data in contacts[i].get('contacts'):
            if data.get('type') == 'phone': tel_list.append(data.get('value'))
            if data.get('type') == 'website': wedsite_list.append(data.get('url'))
            if data.get('type') == 'email': email_list.append(data.get('value'))
            if data.get('type') == 'vkontakte': vk_list.append(data.get('value'))
            if data.get('type') == 'odnoklassniki': ok_list.append(data.get('value'))
            if data.get('type') == 'youtube': youtube_list.append(data.get('value'))
            if data.get('type') == 'pinterest': pinterest_list.append(data.get('value'))
        i += 1

    payload = {
        'company_name': company_name,
        'website': wedsite_list,
        'tel': tel_list,
        'email': email_list,
        'vk': vk_list,
        'ok': ok_list,
        'youtube': youtube_list,
        'pinterest': pinterest_list,
    }
    pprint(payload)
Готовый скрипт в целом будет выглядеть так:
import re
import glob
import json
from bs4 import BeautifulSoup
from pprint import pprint


def load_data(source: str) -> str:
    with open(source, 'r', encoding='utf-8') as f:
        return f.read()

def get_json_data(html: str) -> dict:
    soup = BeautifulSoup(html, 'html.parser')
    scripts = soup.find_all('script', string=re.compile("__customcfg"))
    data = re.findall(r'var initialState = JSON\.parse\(.*?__REACT_QUERY_STATE__', str(scripts[0]))
    return json.loads(data[0][31:-38])

def get_company_data(data: dict, filename: str):
    company_name = data.get('data').get('entity').get('profile').get(filename).get('data').get('org').get('name')
    contacts = data.get('data').get('entity').get('profile').get(filename).get('data').get('contact_groups')
    tel_list = []
    email_list = []
    wedsite_list = []
    vk_list = []
    ok_list = []
    youtube_list = []
    pinterest_list = []
    i = 0
    while i < len(contacts):
        for data in contacts[i].get('contacts'):
            if data.get('type') == 'phone': tel_list.append(data.get('value'))
            if data.get('type') == 'website': wedsite_list.append(data.get('url'))
            if data.get('type') == 'email': email_list.append(data.get('value'))
            if data.get('type') == 'vkontakte': vk_list.append(data.get('value'))
            if data.get('type') == 'odnoklassniki': ok_list.append(data.get('value'))
            if data.get('type') == 'youtube': youtube_list.append(data.get('value'))
            if data.get('type') == 'pinterest': pinterest_list.append(data.get('value'))
        i += 1

    payload = {
        'company_name': company_name,
        'website': wedsite_list,
        'tel': tel_list,
        'email': email_list,
        'vk': vk_list,
        'ok': ok_list,
        'youtube': youtube_list,
        'pinterest': pinterest_list,
    }
    pprint(payload)


def main():
    sources = glob.glob('data/*')
    for source in sources:
        html = load_data(source)
        json_data = get_json_data(html)
        filename = source.split('/')[-1]
        get_company_data(json_data, filename)


if __name__ == '__main__':
    main()
После окончания работы скрипта, получим результат
Осталось только выгрузить данные в файл для дальнейшей обработки, но это уже другая история.

Необходимы услуги парсинга?