Added gitgnore entries used during dev.

Added the option to set comments in input.txt file
Added options to scrape group posts and single group post (automatically identify the link)
This commit is contained in:
Amit Y 2020-04-14 17:06:05 +03:00
parent 67f2be02f6
commit d654c55344
9 changed files with 652 additions and 266 deletions

9
.gitignore vendored
View file

@ -115,8 +115,13 @@ dmypy.json
## Generated data
/data
scraper/credentials.yaml
scraper/input.txt
scraper/data/
scraper/debug.log
##misplaced configuration files
scraper/selectors.json
scraper/params.json
scraper/input.txt
## Python venv
venv
venv

View file

@ -1,2 +1,3 @@
#Lines starting with # and empty lines will be ignored
https://www.facebook.com/andrew.ng.96
https://www.facebook.com/zuck

View file

@ -1,121 +1,151 @@
{
"Friends": {
"scan_list": [
"All",
"Mutual Friends",
"Following",
"Followers",
"Work",
"College",
"Current City",
"Hometown"
],
"section": [
"/friends",
"/friends_mutual",
"/following",
"/followers",
"/friends_work",
"/friends_college",
"/friends_current_city",
"/friends_hometown"
],
"elements_path": [
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@class,'_3i9')][1]/div/div/ul/li[1]/div[2]/div/div/div/div/div[2]/ul/li/div/a",
"//*[contains(@class,'fbProfileBrowserListItem')]/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a"
],
"file_names": [
"All Friends.txt",
"Mutual Friends.txt",
"Following.txt",
"Followers.txt",
"Work Friends.txt",
"College Friends.txt",
"Current City Friends.txt",
"Hometown Friends.txt"
],
"save_status":0
},
"Photos": {
"scan_list": [
"'s Photos",
"Photos of"
],
"section": [
"/photos_all",
"/photos_of"
],
"elements_path": [
"//*[contains(@id, 'pic_')]",
"//*[contains(@id, 'pic_')]"
],
"file_names": [
"Uploaded Photos.txt",
"Tagged Photos.txt"
],
"save_status": 1
},
"Videos": {
"scan_list": [
"'s Videos",
"Videos of"
],
"section": [
"/videos_by",
"/videos_of"
],
"elements_path": [
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul"
],
"file_names": [
"Uploaded Videos.txt",
"Tagged Videos.txt"
],
"save_status": 2
},
"About": {
"scan_list": [],
"section": [
"/about?section=overview",
"/about?section=education",
"/about?section=living",
"/about?section=contact-info",
"/about?section=relationship",
"/about?section=bio",
"/about?section=year-overviews"
],
"elements_path": [
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div"
],
"file_names": [
"Overview.txt",
"Work and Education.txt",
"Places Lived.txt",
"Contact and Basic Info.txt",
"Family and Relationships.txt",
"Details About.txt",
"Life Events.txt"
],
"save_status": 3
},
"Posts": {
"scan_list": [],
"section": [],
"elements_path": ["//div[@class='_5pcb _4b0l _2q8l']"],
"file_names": ["Posts.txt"],
"save_status": 4
}
"Friends": {
"scan_list": [
"All",
"Mutual Friends",
"Following",
"Followers",
"Work",
"College",
"Current City",
"Hometown"
],
"section": [
"/friends",
"/friends_mutual",
"/following",
"/followers",
"/friends_work",
"/friends_college",
"/friends_current_city",
"/friends_hometown"
],
"elements_path": [
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@class,'_3i9')][1]/div/div/ul/li[1]/div[2]/div/div/div/div/div[2]/ul/li/div/a",
"//*[contains(@class,'fbProfileBrowserListItem')]/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a",
"//*[contains(@id,'pagelet_timeline_medley_friends')][1]/div[2]/div/ul/li/div/a"
],
"file_names": [
"All Friends.txt",
"Mutual Friends.txt",
"Following.txt",
"Followers.txt",
"Work Friends.txt",
"College Friends.txt",
"Current City Friends.txt",
"Hometown Friends.txt"
],
"save_status": 0
},
"Members": {
"scan_list": [
"All"
],
"section": [
"/members"
],
"elements_path": [
"//*[contains(@id,'pagelet_group_members')][1]/div[2]/div/ul/li/div/a"
],
"file_names": [
"All Members.txt"
],
"save_status": 0
},
"Photos": {
"scan_list": [
"'s Photos",
"Photos of"
],
"section": [
"/photos_all",
"/photos_of"
],
"elements_path": [
"//*[contains(@id, 'pic_')]",
"//*[contains(@id, 'pic_')]"
],
"file_names": [
"Uploaded Photos.txt",
"Tagged Photos.txt"
],
"save_status": 1
},
"Videos": {
"scan_list": [
"'s Videos",
"Videos of"
],
"section": [
"/videos_by",
"/videos_of"
],
"elements_path": [
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul"
],
"file_names": [
"Uploaded Videos.txt",
"Tagged Videos.txt"
],
"save_status": 2
},
"About": {
"scan_list": [],
"section": [
"/about?section=overview",
"/about?section=education",
"/about?section=living",
"/about?section=contact-info",
"/about?section=relationship",
"/about?section=bio",
"/about?section=year-overviews"
],
"elements_path": [
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div",
"//*[contains(@id, 'pagelet_timeline_app_collection_')]/ul/li/div/div[2]/div/div"
],
"file_names": [
"Overview.txt",
"Work and Education.txt",
"Places Lived.txt",
"Contact and Basic Info.txt",
"Family and Relationships.txt",
"Details About.txt",
"Life Events.txt"
],
"save_status": 3
},
"Posts": {
"scan_list": [],
"section": [],
"elements_path": [
"//div[@class='_5pcb _4b0l _2q8l']"
],
"file_names": [
"Posts.txt"
],
"save_status": 4
},
"GroupPosts": {
"scan_list": [],
"section": [],
"elements_path": [
"//div[@class='_4-u2 mbm _4mrt _5jmm _5pat _5v3q _7cqq _4-u8']"
],
"file_names": [
"Posts.txt"
],
"save_status": 5
}
}

View file

@ -1,2 +1,3 @@
selenium==3.141.0
pyyaml
pyyaml
webdriver_manager

View file

@ -1,7 +1,5 @@
import calendar
import json
import os
import platform
import sys
import urllib.request
import yaml
@ -9,7 +7,7 @@ import utils
import argparse
from selenium import webdriver
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
@ -53,6 +51,12 @@ def get_facebook_images_url(img_links):
# takes a url and downloads image from that url
def image_downloader(img_links, folder_name):
"""
Download images from a list of image urls.
:param img_links:
:param folder_name:
:return: list of image names downloaded
"""
img_names = []
try:
@ -84,7 +88,6 @@ def image_downloader(img_links, folder_name):
os.chdir(parent)
except Exception:
print("Exception (image_downloader):", sys.exc_info()[0])
return img_names
@ -96,97 +99,35 @@ def extract_and_write_posts(elements, filename):
try:
f = open(filename, "w", newline="\r\n", encoding="utf-8")
f.writelines(
" TIME || TYPE || TITLE || STATUS || LINKS(Shared Posts/Shared Links etc) "
" TIME || TYPE || TITLE || STATUS || LINKS(Shared Posts/Shared Links etc) || SEE_MORE_LINK || POST_ID "
+ "\n"
+ "\n"
)
ids = []
for x in elements:
try:
title = " "
status = " "
link = ""
time = " "
# id
post_id = utils.get_post_id(x)
ids.append(post_id)
# time
time = utils.get_time(x)
# title
title = utils.get_title(x, selectors)
if title.text.find("shared a memory") != -1:
x = x.find_element_by_xpath(selectors.get("title_element"))
title = utils.get_title(x, selectors)
status = utils.get_status(x, selectors)
if (
title.text
== driver.find_element_by_id(selectors.get("title_text")).text
):
if status == "":
temp = utils.get_div_links(x, "img", selectors)
if (
temp == ""
): # no image tag which means . it is not a life event
link = utils.get_div_links(x, "a", selectors).get_attribute(
"href"
)
type = "status update without text"
else:
type = "life event"
link = utils.get_div_links(x, "a", selectors).get_attribute(
"href"
)
status = utils.get_div_links(x, "a", selectors).text
else:
type = "status update"
if utils.get_div_links(x, "a", selectors) != "":
link = utils.get_div_links(x, "a", selectors).get_attribute(
"href"
)
elif title.text.find(" shared ") != -1:
x1, link = utils.get_title_links(title)
type = "shared " + x1
elif title.text.find(" at ") != -1 or title.text.find(" in ") != -1:
if title.text.find(" at ") != -1:
x1, link = utils.get_title_links(title)
type = "check in"
elif title.text.find(" in ") != 1:
status = utils.get_div_links(x, "a", selectors).text
elif (
title.text.find(" added ") != -1 and title.text.find("photo") != -1
):
type = "added photo"
link = utils.get_div_links(x, "a", selectors).get_attribute("href")
elif (
title.text.find(" added ") != -1 and title.text.find("video") != -1
):
type = "added video"
link = utils.get_div_links(x, "a", selectors).get_attribute("href")
else:
type = "others"
if not isinstance(title, str):
title = title.text
status = status.replace("\n", " ")
title = title.replace("\n", " ")
link, status, title, post_type = get_status_and_title(link, x)
line = (
str(time)
+ " || "
+ str(type)
+ str(post_type)
+ " || "
+ str(title)
+ " || "
+ str(status)
+ " || "
+ str(link)
+ " || "
+ str(post_id)
+ "\n"
)
@ -197,12 +138,114 @@ def extract_and_write_posts(elements, filename):
except Exception:
pass
f.close()
except ValueError:
print("Exception (extract_and_write_posts)", "Status =", sys.exc_info()[0])
except Exception:
print("Exception (extract_and_write_posts)", "Status =", sys.exc_info()[0])
return
def get_status_and_title(link, x):
# title
title = utils.get_title(x, selectors)
if title.text.find("shared a memory") != -1:
x = x.find_element_by_xpath(selectors.get("title_element"))
title = utils.get_title(x, selectors)
status = utils.get_status(x, selectors)
if title.text == driver.find_element_by_id(selectors.get("title_text")).text:
if status == "":
temp = utils.get_div_links(x, "img", selectors)
if temp == "": # no image tag which means . it is not a life event
link = utils.get_div_links(x, "a", selectors).get_attribute("href")
post_type = "status update without text"
else:
post_type = "life event"
link = utils.get_div_links(x, "a", selectors).get_attribute("href")
status = utils.get_div_links(x, "a", selectors).text
else:
post_type = "status update"
if utils.get_div_links(x, "a", selectors) != "":
link = utils.get_div_links(x, "a", selectors).get_attribute("href")
elif title.text.find(" shared ") != -1:
x1, link = utils.get_title_links(title)
post_type = "shared " + x1
elif title.text.find(" at ") != -1 or title.text.find(" in ") != -1:
if title.text.find(" at ") != -1:
x1, link = utils.get_title_links(title)
post_type = "check in"
elif title.text.find(" in ") != 1:
status = utils.get_div_links(x, "a", selectors).text
elif title.text.find(" added ") != -1 and title.text.find("photo") != -1:
post_type = "added photo"
link = utils.get_div_links(x, "a", selectors).get_attribute("href")
elif title.text.find(" added ") != -1 and title.text.find("video") != -1:
post_type = "added video"
link = utils.get_div_links(x, "a", selectors).get_attribute("href")
else:
post_type = "others"
if not isinstance(title, str):
title = title.text
status = status.replace("\n", " ")
title = title.replace("\n", " ")
return link, status, title, post_type
def extract_and_write_group_posts(elements, filename):
try:
f = create_post_file(filename)
ids = []
for x in elements:
try:
# id
post_id = utils.get_group_post_id(x)
ids.append(post_id)
except Exception:
pass
total = len(ids)
i = 0
for post_id in ids:
i += 1
try:
add_group_post_to_file(f, filename, post_id, i, total, reload=True)
except ValueError:
pass
f.close()
except ValueError:
print("Exception (extract_and_write_posts)", "Status =", sys.exc_info()[0])
except Exception:
print("Exception (extract_and_write_posts)", "Status =", sys.exc_info()[0])
return
def add_group_post_to_file(f, filename, post_id, number=1, total=1, reload=False):
print("Scraping Post(" + post_id + "). " + str(number) + " of " + str(total))
photos_dir = os.path.dirname(filename)
if reload:
driver.get(utils.create_post_link(post_id, selectors))
line = get_group_post_as_line(post_id, photos_dir)
try:
f.writelines(line)
except Exception:
print("Posts: Could not map encoded characters")
def create_post_file(filename):
"""
Creates post file and header
:param filename:
:return: file
"""
f = open(filename, "w", newline="\r\n", encoding="utf-8")
f.writelines(
"TIME || TYPE || TITLE || STATUS || LINKS(Shared Posts/Shared Links etc) || SEE_MORE_LINK || POST_ID || "
"PHOTO || COMMENTS " + "\n"
)
return f
# -------------------------------------------------------------
# -------------------------------------------------------------
@ -215,11 +258,12 @@ def save_to_file(name, elements, status, current_section):
# status 2 = dealing with videos
# status 3 = dealing with about section
# status 4 = dealing with posts
# status 5 = dealing with group posts
try:
f = None # file pointer
if status != 4:
if status != 4 and status != 5:
f = open(name, "w", encoding="utf-8", newline="\r\n")
results = []
@ -363,6 +407,11 @@ def save_to_file(name, elements, status, current_section):
extract_and_write_posts(elements, name)
return
# dealing with Group Posts
elif status == 5:
extract_and_write_group_posts(elements, name)
return
"""Write results to file"""
if status == 0:
for i, _ in enumerate(results):
@ -404,14 +453,14 @@ def save_to_file(name, elements, status, current_section):
# -----------------------------------------------------------------------------
def scrape_data(user_id, scan_list, section, elements_path, save_status, file_names):
def scrape_data(url, scan_list, section, elements_path, save_status, file_names):
"""Given some parameters, this function can scrap friends/photos/videos/about/posts(statuses) of a profile"""
page = []
if save_status == 4:
page.append(user_id)
if save_status == 4 or save_status == 5:
page.append(url)
page += [user_id + s for s in section]
page += [url + s for s in section]
for i, _ in enumerate(scan_list):
try:
@ -431,6 +480,7 @@ def scrape_data(user_id, scan_list, section, elements_path, save_status, file_na
if save_status != 3:
utils.scroll(total_scrolls, driver, selectors, scroll_time)
pass
data = driver.find_elements_by_xpath(elements_path[i])
@ -477,52 +527,193 @@ def create_original_link(url):
return original_link
def scrap_profile(ids):
def scrap_profile():
data_folder = os.path.join(os.getcwd(), "data")
utils.create_folder(data_folder)
os.chdir(data_folder)
# execute for all profiles given in input.txt file
url = driver.current_url
user_id = create_original_link(url)
print("\nScraping:", user_id)
try:
target_dir = os.path.join(data_folder, user_id.split("/")[-1])
utils.create_folder(target_dir)
os.chdir(target_dir)
except Exception:
print("Some error occurred in creating the profile directory.")
os.chdir("../..")
return
to_scrap = ["Friends", "Photos", "Videos", "About", "Posts"]
for item in to_scrap:
print("----------------------------------------")
print("Scraping {}..".format(item))
if item == "Posts":
scan_list = [None]
elif item == "About":
scan_list = [None] * 7
else:
scan_list = params[item]["scan_list"]
section = params[item]["section"]
elements_path = params[item]["elements_path"]
file_names = params[item]["file_names"]
save_status = params[item]["save_status"]
scrape_data(user_id, scan_list, section, elements_path, save_status, file_names)
print("{} Done!".format(item))
print("Finished Scraping Profile " + str(user_id) + ".")
os.chdir("../..")
return
def get_comments():
comments = []
try:
data = driver.find_element_by_xpath(selectors.get("comment_section"))
reply_links = driver.find_elements_by_xpath(
selectors.get("more_comment_replies")
)
for link in reply_links:
try:
driver.execute_script("arguments[0].click();", link)
except Exception:
pass
see_more_links = driver.find_elements_by_xpath(
selectors.get("comment_see_more_link")
)
for link in see_more_links:
try:
driver.execute_script("arguments[0].click();", link)
except Exception:
pass
data = data.find_elements_by_xpath(selectors.get("comment"))
for d in data:
try:
author = d.find_element_by_xpath(selectors.get("comment_author")).text
text = d.find_element_by_xpath(selectors.get("comment_text")).text
replies = utils.get_replies(d, selectors)
comments.append([author, text, replies])
except Exception:
pass
except Exception:
pass
return comments
def get_group_post_as_line(post_id, photos_dir):
try:
data = driver.find_element_by_xpath(selectors.get("single_post"))
time = utils.get_time(data)
title = utils.get_title(data, selectors).text
# link, status, title, type = get_status_and_title(title,data)
link = utils.get_div_links(data, "a", selectors)
if link != "":
link = link.get_attribute("href")
post_type = ""
status = '"' + utils.get_status(data, selectors).replace("\r\n", " ") + '"'
photos = utils.get_post_photos_links(data, selectors, photos_small_size)
comments = get_comments()
photos = image_downloader(photos, photos_dir)
line = (
str(time)
+ "||"
+ str(post_type)
+ "||"
+ str(title)
+ "||"
+ str(status)
+ "||"
+ str(link)
+ "||"
+ str(post_id)
+ "||"
+ str(photos)
+ "||"
+ str(comments)
+ "\n"
)
return line
except Exception:
return ""
def create_folders():
"""
Creates folder for saving data (profile, post or group) according to current driver url
Changes current dir to target_dir
:return: target_dir or None in case of failure
"""
folder = os.path.join(os.getcwd(), "data")
utils.create_folder(folder)
os.chdir(folder)
try:
item_id = get_item_id(driver.current_url)
target_dir = os.path.join(folder, item_id)
utils.create_folder(target_dir)
os.chdir(target_dir)
return target_dir
except Exception:
print("Some error occurred in creating the group directory.")
os.chdir("../..")
return None
def get_item_id(url):
"""
Gets item id from url
:param url: facebook url string
:return: item id or empty string in case of failure
"""
ret = ""
try:
link = create_original_link(url)
ret = link.split("/")[-1]
if ret.strip() == "":
ret = link.split("/")[-2]
except Exception as e:
print("Failed to get id: " + format(e))
return ret
def scrape_group(url):
if create_folders() is None:
return
group_id = get_item_id(url)
# execute for all profiles given in input.txt file
for user_id in ids:
print("\nScraping:", group_id)
driver.get(user_id)
url = driver.current_url
user_id = create_original_link(url)
to_scrap = ["GroupPosts"] # , "Photos", "Videos", "About"]
for item in to_scrap:
print("----------------------------------------")
print("Scraping {}..".format(item))
print("\nScraping:", user_id)
if item == "Posts":
scan_list = [None]
elif item == "GroupPosts":
scan_list = [None]
elif item == "About":
scan_list = [None] * 7
else:
scan_list = params[item]["scan_list"]
try:
target_dir = os.path.join(folder, user_id.split("/")[-1])
utils.create_folder(target_dir)
os.chdir(target_dir)
except Exception:
print("Some error occurred in creating the profile directory.")
continue
section = params[item]["section"]
elements_path = params[item]["elements_path"]
file_names = params[item]["file_names"]
save_status = params[item]["save_status"]
to_scrap = ["Friends", "Photos", "Videos", "About", "Posts"]
for item in to_scrap:
print("----------------------------------------")
print("Scraping {}..".format(item))
scrape_data(url, scan_list, section, elements_path, save_status, file_names)
if item == "Posts":
scan_list = [None]
elif item == "About":
scan_list = [None] * 7
else:
scan_list = params[item]["scan_list"]
print("{} Done!".format(item))
section = params[item]["section"]
elements_path = params[item]["elements_path"]
file_names = params[item]["file_names"]
save_status = params[item]["save_status"]
scrape_data(
user_id, scan_list, section, elements_path, save_status, file_names
)
print("{} Done!".format(item))
print("\nProcess Completed.")
print("Finished Scraping Group " + str(group_id) + ".")
os.chdir("../..")
return
@ -532,13 +723,6 @@ def scrap_profile(ids):
# -----------------------------------------------------------------------------
def safe_find_element_by_id(driver, elem_id):
try:
return driver.find_element_by_id(elem_id)
except NoSuchElementException:
return None
def login(email, password):
""" Logging into our own profile """
@ -554,17 +738,11 @@ def login(email, password):
# options.add_argument("headless")
try:
platform_ = platform.system().lower()
driver = webdriver.Chrome(
executable_path=ChromeDriverManager().install(), options=options
)
except Exception:
print(
"Kindly replace the Chrome Web Driver with the latest one from "
"http://chromedriver.chromium.org/downloads "
"and also make sure you have the latest Chrome Browser version."
"\nYour OS: {}".format(platform_)
)
print("Error loading chrome webdriver " + sys.exc_info()[0])
exit(1)
fb_path = facebook_https_prefix + facebook_link_body
@ -583,7 +761,7 @@ def login(email, password):
driver.find_element_by_name("login").click()
# if your account uses multi factor authentication
mfa_code_input = safe_find_element_by_id(driver, "approvals_code")
mfa_code_input = utils.safe_find_element_by_id(driver, "approvals_code")
if mfa_code_input is None:
return
@ -592,8 +770,10 @@ def login(email, password):
driver.find_element_by_id("checkpointSubmitButton").click()
# there are so many screens asking you to verify things. Just skip them all
while safe_find_element_by_id(driver, "checkpointSubmitButton") is not None:
dont_save_browser_radio = safe_find_element_by_id(driver, "u_0_3")
while (
utils.safe_find_element_by_id(driver, "checkpointSubmitButton") is not None
):
dont_save_browser_radio = utils.safe_find_element_by_id(driver, "u_0_3")
if dont_save_browser_radio is not None:
dont_save_browser_radio.click()
@ -616,17 +796,34 @@ def scraper(**kwargs):
if ("password" not in cfg) or ("email" not in cfg):
print("Your email or password is missing. Kindly write them in credentials.txt")
exit(1)
ids = [
facebook_https_prefix + facebook_link_body + line.split("/")[-1]
for line in open("input.txt", newline="\n")
urls = [
facebook_https_prefix + facebook_link_body + get_item_id(line)
for line in open("input.txt", newline="\r\n")
if not line.lstrip().startswith("#") and not line.strip() == ""
]
if len(ids) > 0:
if len(urls) > 0:
print("\nStarting Scraping...")
login(cfg["email"], cfg["password"])
scrap_profile(ids)
for url in urls:
driver.get(url)
link_type = utils.identify_url(driver.current_url)
if link_type == 0:
scrap_profile()
elif link_type == 1:
# scrap_post(url)
pass
elif link_type == 2:
scrape_group(driver.current_url)
elif link_type == 3:
file_name = params["GroupPosts"]["file_names"][0]
item_id = get_item_id(driver.current_url)
if create_folders() is None:
continue
f = create_post_file(file_name)
add_group_post_to_file(f, file_name, item_id)
f.close()
os.chdir("../..")
driver.close()
else:
print("Input file is empty.")
@ -694,7 +891,6 @@ if __name__ == "__main__":
old_height = 0
driver = None
CHROMEDRIVER_BINARIES_FOLDER = "bin"
with open("selectors.json") as a, open("params.json") as b:
selectors = json.load(a)

View file

@ -1,10 +1,12 @@
import argparse
import os
import sys
from calendar import calendar
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.support.ui import WebDriverWait
# -----------------------------------------------------------------------------
#
# -----------------------------------------------------------------------------
@ -17,6 +19,12 @@ def to_bool(x):
raise argparse.ArgumentTypeError("Boolean value expected")
def create_post_link(post_id, selectors):
return (
selectors["facebook_https_prefix"] + selectors["facebook_link_body"] + post_id
)
# -----------------------------------------------------------------------------
#
# -----------------------------------------------------------------------------
@ -75,6 +83,64 @@ def get_status(x, selectors):
return status
def get_post_id(x):
post_id = -1
try:
post_id = x.get_attribute("id")
post_id = post_id.split(":")[-1]
except Exception:
pass
return post_id
def get_group_post_id(x):
post_id = -1
try:
post_id = x.get_attribute("id")
post_id = post_id.split("_")[-1]
if ";" in post_id:
post_id = post_id.split(";")
post_id = post_id[2]
else:
post_id = post_id.split(":")[0]
except Exception:
pass
return post_id
def get_photo_link(x, selectors, small_photo):
link = ""
try:
if small_photo:
link = x.find_element_by_xpath(
selectors.get("post_photo_small")
).get_attribute("src")
else:
link = x.get_attribute("data-ploi")
except NoSuchElementException:
try:
link = x.find_element_by_xpath(
selectors.get("post_photo_small_opt1")
).get_attribute("src")
except AttributeError:
pass
except Exception:
print("Exception (get_post_photo_link):", sys.exc_info()[0])
except Exception:
print("Exception (get_post_photo_link):", sys.exc_info()[0])
return link
def get_post_photos_links(x, selectors, small_photo):
links = []
photos = safe_find_elements_by_xpath(x, selectors.get("post_photos"))
if photos is not None:
for el in photos:
links.append(get_photo_link(el, selectors, small_photo))
return links
def get_div_links(x, tag, selectors):
try:
temp = x.find_element_by_xpath(selectors.get("temp"))
@ -137,3 +203,52 @@ def get_time(x):
finally:
return time
def identify_url(url):
"""
A possible way to identify the link.
Not Exhaustive!
:param url:
:return:
0 - Profile
1 - Profile post
2 - Group
3 - Group post
"""
if "groups" in url:
if "permalink" in url:
return 3
else:
return 2
elif "posts" in url:
return 1
else:
return 0
def safe_find_elements_by_xpath(driver, xpath):
try:
return driver.find_elements_by_xpath(xpath)
except NoSuchElementException:
return None
def get_replies(comment_element, selectors):
replies = []
data = comment_element.find_elements_by_xpath(selectors.get("comment_reply"))
for d in data:
try:
author = d.find_element_by_xpath(selectors.get("comment_author")).text
text = d.find_element_by_xpath(selectors.get("comment_text")).text
replies.append([author, text])
except Exception:
pass
return replies
def safe_find_element_by_id(driver, elem_id):
try:
return driver.find_element_by_id(elem_id)
except NoSuchElementException:
return None

View file

@ -17,5 +17,17 @@
"scroll_script": "window.scrollTo(0, document.body.scrollHeight);",
"title_text": "fb-timeline-cover-name",
"profilePicThumb": "profilePicThumb",
"fb_link": "https://en-gb.facebook.com/"
"fb_link": "https://en-gb.facebook.com/",
"single_post" : ".//div[contains(@class, '_5pcr')]",
"post_photos": ".//a[contains(@class, '_5dec') or contains(@class, '_4-eo')]",
"post_photo_small" : ".//img[contains(@class, '_46-i')]",
"post_photo_small_opt1" : ".//img[contains(@class, 'scaledImageFitWidth') or contains(@class, 'scaledImageFitHeight')]",
"comment_section" : ".//*[@class='commentable_item']",
"comment" : ".//div[@aria-label='Comment']",
"comment_author" : ".//a[@class='_6qw4']",
"comment_text" : ".//span[contains(@class,'_3l3x')]",
"more_comment_replies": ".//a[contains(@class,'_4sxc _42ft')]",
"comment_see_more_link" : ".//a[contains(@class,'_5v47 fss')]",
"comment_reply" : "..//..//div[@aria-label='Comment reply']"
}

0
test/__init__.py Normal file
View file

26
test/test_utils.py Normal file
View file

@ -0,0 +1,26 @@
from unittest import TestCase
from scraper import utils
class Test(TestCase):
def test_identify_url(self):
self.assertEqual(
utils.identify_url("https://www.facebook.com/groups/123456789694/?fref=nf"),
2,
)
self.assertEqual(
utils.identify_url("https://www.facebook.com/groups/123456789694"), 2
)
self.assertEqual(
utils.identify_url(
"https://www.facebook.com/groups/12345645546/permalink/213453415513/"
),
3,
)
self.assertEqual(
utils.identify_url("https://www.facebook.com/dfsdfsdf.sdfsdfs"), 0,
)
self.assertEqual(
utils.identify_url("https://www.facebook.com/sdfsdfsd/posts/123456784684"),
1,
)