2ГИС по версии Forbes входит в десятку крупнейших интернет-компаний России. На ноябрь 2021 года карты-справочники компании содержали 20,5 тыс. населённых пунктов (из них 790 городов) в 12 странах, что дает возможность автоматизированно извлекать информацию для создания уникальных баз данных контактов компаний.
2ГИС имеет защиту от парсинга, следовательно при сборе данных нам нужно эмулировать поведение реальных людей. Для выполнения подобной задачи остановимся на инструментах Python и Selenium.
async-generator==1.10
attrs==21.4.0
beautifulsoup4==4.11.1
blinker==1.4
Brotli==1.0.9
certifi==2022.6.15
cffi==1.15.0
charset-normalizer==2.0.12
cryptography==37.0.2
fake-useragent==0.1.11
h11==0.13.0
h2==4.1.0
hpack==4.0.0
hyperframe==6.0.1
idna==3.3
kaitaistruct==0.9
outcome==1.2.0
pyasn1==0.4.8
pycparser==2.21
pyOpenSSL==22.0.0
pyparsing==3.0.9
PySocks==1.7.1
requests==2.28.0
selenium==4.2.0
selenium-wire==4.6.4
sniffio==1.2.0
sortedcontainers==2.4.0
soupsieve==2.3.2.post1
trio==0.21.0
trio-websocket==0.9.2
urllib3==1.26.9
Werkzeug==2.0.3
wsproto==1.1.0
zstandard==0.17.0
import lib.config
import random
import lib.headers
from seleniumwire import webdriver
from selenium.webdriver.chrome.service import Service
def get_headers_proxy() -> dict:
'''
The config file must have dict:
{
'http_proxy':'http://user:password@ip:port',
'user-agent': 'user_agent name'
}
'''
try:
users = lib.config.USER_AGENTS_PROXY_LIST
persona = random.choice(users)
except ImportError:
persona = None
return persona
def get_html(persona, url):
options = webdriver.ChromeOptions()
options.add_argument(f"user-agent={persona['user-agent']}")
options.add_argument("--disable-blink-features=AutomationControlled")
options_proxy = {
'proxy': {
'https': persona['http_proxy'],
'no_proxy': 'localhost,127.0.0.1:8080'
}
}
s = Service(executable_path="/lib/chromedriver")
driver = webdriver.Chrome(options=options, service=s, seleniumwire_options=options_proxy)
try:
driver.request_interceptor = lib.headers.interceptor
driver.set_window_size(1920, 1080)
driver.get(url)
html = driver.page_source
except Exception as e:
print(e)
html = None
finally:
driver.close()
driver.quit()
return html
def filename_generator(url):
return url.split('/')[-2]
def write_to_file(text, filename):
with open(f"data/{filename}", 'w', encoding='utf-8') as f:
f.write(text)
def main():
url = 'https://2gis.ru/moscow/search/%D0%92%D1%85%D0%BE%D0%B4%D0%BD%D1%8B%D0%B5%20%D0%B4%D0%B2%D0%B5%D1%80%D0%B8/rubricId/9972/firm/70000001022846376/37.722346%2C55.61045'
persona = get_headers_proxy()
html = get_html(persona, url)
write_to_file(html, filename_generator(url))
import lib.config
import random
import lib.headers
from seleniumwire import webdriver
from selenium.webdriver.chrome.service import Service
def get_headers_proxy() -> dict:
'''
The config file must have dict:
{
'http_proxy':'http://user:password@ip:port',
'user-agent': 'user_agent name'
}
'''
try:
users = lib.config.USER_AGENTS_PROXY_LIST
persona = random.choice(users)
except ImportError:
persona = None
return persona
def get_html(persona: dict, url: str) -> str:
options = webdriver.ChromeOptions()
options.add_argument(f"user-agent={persona['user-agent']}")
options.add_argument("--disable-blink-features=AutomationControlled")
options_proxy = {
'proxy': {
'https': persona['http_proxy'],
'no_proxy': 'localhost,127.0.0.1:8080'
}
}
s = Service(executable_path="/Users/noy/Documents/dev/PYTHON/ozon_live/lib/chromedriver")
driver = webdriver.Chrome(options=options, service=s, seleniumwire_options=options_proxy)
try:
driver.request_interceptor = lib.headers.interceptor
driver.set_window_size(1920, 1080)
driver.get(url)
html = driver.page_source
except Exception as e:
print(e)
html = None
finally:
driver.close()
driver.quit()
return html
def filename_generator(url: str) -> str:
return url.split('/')[-2]
def write_to_file(text: str, filename: str):
with open(f"data/{filename}", 'w', encoding='utf-8') as f:
f.write(text)
def main():
url = 'https://2gis.ru/moscow/search/%D0%92%D1%85%D0%BE%D0%B4%D0%BD%D1%8B%D0%B5%20%D0%B4%D0%B2%D0%B5%D1%80%D0%B8/rubricId/9972/firm/70000001022846376/37.722346%2C55.61045'
persona = get_headers_proxy()
html = get_html(persona, url)
write_to_file(html, filename_generator(url))
if __name__ == '__main__':
main()
import re
import glob
import json
from bs4 import BeautifulSoup
from pprint import pprint
def load_data(source):
with open(source, 'r', encoding='utf-8') as f:
return f.read()
def get_json_data(html):
soup = BeautifulSoup(html, 'html.parser')
scripts = soup.find_all('script', string=re.compile("__customcfg"))
data = re.findall(r'var initialState = JSON\.parse\(.*?__REACT_QUERY_STATE__', str(scripts[0]))
return json.loads(data[0][31:-38])
def get_company_data(data: dict, filename: str):
company_name = data.get('data').get('entity').get('profile').get(filename).get('data').get('org').get('name')
contacts = data.get('data').get('entity').get('profile').get(filename).get('data').get('contact_groups')
tel_list = []
email_list = []
wedsite_list = []
vk_list = []
ok_list = []
youtube_list = []
pinterest_list = []
i = 0
while i < len(contacts):
for data in contacts[i].get('contacts'):
if data.get('type') == 'phone': tel_list.append(data.get('value'))
if data.get('type') == 'website': wedsite_list.append(data.get('url'))
if data.get('type') == 'email': email_list.append(data.get('value'))
if data.get('type') == 'vkontakte': vk_list.append(data.get('value'))
if data.get('type') == 'odnoklassniki': ok_list.append(data.get('value'))
if data.get('type') == 'youtube': youtube_list.append(data.get('value'))
if data.get('type') == 'pinterest': pinterest_list.append(data.get('value'))
i += 1
payload = {
'company_name': company_name,
'website': wedsite_list,
'tel': tel_list,
'email': email_list,
'vk': vk_list,
'ok': ok_list,
'youtube': youtube_list,
'pinterest': pinterest_list,
}
pprint(payload)
import re
import glob
import json
from bs4 import BeautifulSoup
from pprint import pprint
def load_data(source: str) -> str:
with open(source, 'r', encoding='utf-8') as f:
return f.read()
def get_json_data(html: str) -> dict:
soup = BeautifulSoup(html, 'html.parser')
scripts = soup.find_all('script', string=re.compile("__customcfg"))
data = re.findall(r'var initialState = JSON\.parse\(.*?__REACT_QUERY_STATE__', str(scripts[0]))
return json.loads(data[0][31:-38])
def get_company_data(data: dict, filename: str):
company_name = data.get('data').get('entity').get('profile').get(filename).get('data').get('org').get('name')
contacts = data.get('data').get('entity').get('profile').get(filename).get('data').get('contact_groups')
tel_list = []
email_list = []
wedsite_list = []
vk_list = []
ok_list = []
youtube_list = []
pinterest_list = []
i = 0
while i < len(contacts):
for data in contacts[i].get('contacts'):
if data.get('type') == 'phone': tel_list.append(data.get('value'))
if data.get('type') == 'website': wedsite_list.append(data.get('url'))
if data.get('type') == 'email': email_list.append(data.get('value'))
if data.get('type') == 'vkontakte': vk_list.append(data.get('value'))
if data.get('type') == 'odnoklassniki': ok_list.append(data.get('value'))
if data.get('type') == 'youtube': youtube_list.append(data.get('value'))
if data.get('type') == 'pinterest': pinterest_list.append(data.get('value'))
i += 1
payload = {
'company_name': company_name,
'website': wedsite_list,
'tel': tel_list,
'email': email_list,
'vk': vk_list,
'ok': ok_list,
'youtube': youtube_list,
'pinterest': pinterest_list,
}
pprint(payload)
def main():
sources = glob.glob('data/*')
for source in sources:
html = load_data(source)
json_data = get_json_data(html)
filename = source.split('/')[-1]
get_company_data(json_data, filename)
if __name__ == '__main__':
main()