Merge pull request #590 from sherlock-project/restructure_take2

Sherlock Restructure (Take 2)
This commit is contained in:
Siddharth Dushantha 2020-05-07 15:04:11 +02:00 committed by GitHub
commit 7a0047b6d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 903 additions and 402 deletions

View file

@ -59,17 +59,16 @@ $ python3 -m pip install -r requirements.txt
## Usage
```
$ python3 sherlock.py --help
usage: sherlock.py [-h] [--version] [--verbose] [--rank]
[--folderoutput FOLDEROUTPUT] [--output OUTPUT] [--tor]
[--unique-tor] [--csv] [--site SITE_NAME]
[--proxy PROXY_URL] [--json JSON_FILE]
[--proxy_list PROXY_LIST] [--check_proxies CHECK_PROXY]
[--timeout TIMEOUT] [--print-found]
USERNAMES [USERNAMES ...]
```bash
$ python3 sherlock --help
usage: sherlock [-h] [--version] [--verbose] [--rank]
[--folderoutput FOLDEROUTPUT] [--output OUTPUT] [--tor]
[--unique-tor] [--csv] [--site SITE_NAME] [--proxy PROXY_URL]
[--json JSON_FILE] [--timeout TIMEOUT] [--print-found]
[--no-color] [--browse]
USERNAMES [USERNAMES ...]
Sherlock: Find Usernames Across Social Networks (Version 0.10.7)
Sherlock: Find Usernames Across Social Networks (Version 0.11.0)
positional arguments:
USERNAMES One or more usernames to check with social networks.
@ -101,15 +100,6 @@ optional arguments:
--json JSON_FILE, -j JSON_FILE
Load data from a JSON file or an online, valid, JSON
file.
--proxy_list PROXY_LIST, -pl PROXY_LIST
Make requests over a proxy randomly chosen from a list
generated from a .csv file.
--check_proxies CHECK_PROXY, -cp CHECK_PROXY
To be used with the '--proxy_list' parameter. The
script will check if the proxies supplied in the .csv
file are working and anonymous.Put 0 for no limit on
successfully checked proxies, or another number to
institute a limit.
--timeout TIMEOUT Time (in seconds) to wait for response to requests.
Default timeout of 60.0s.A longer timeout will be more
likely to get results from slow sites.On the other
@ -122,12 +112,12 @@ optional arguments:
To search for only one user:
```
python3 sherlock.py user123
python3 sherlock user123
```
To search for more than one user:
```
python3 sherlock.py user1 user2 user3
python3 sherlock user1 user2 user3
```
Accounts found will be stored in an individual text file with the corresponding username (e.g ```user123.txt```).
@ -196,6 +186,7 @@ Sherlock. This invocation hides the progress text that Sherlock normally
outputs, and instead shows the verbose output of the tests.
```
$ cd sherlock
$ python3 -m unittest tests.all --buffer --verbose
```
@ -203,7 +194,7 @@ Note that we do currently have 100% test coverage. Unfortunately, some of
the sites that Sherlock checks are not always reliable, so it is common
to get response errors.
If some sites are failing due to conection problems (site is down, in maintainence, etc)
If some sites are failing due to connection problems (site is down, in maintenance, etc)
you can exclude them from tests by creating a `tests/.excluded_sites` file with a
list of sites to ignore (one site name per line).

View file

@ -1,4 +1,12 @@
{
"AdobeForums": {
"errorType": "status_code",
"rank": 59,
"url": "https://forums.adobe.com/people/{}",
"urlMain": "https://forums.adobe.com/",
"username_claimed": "jack",
"username_unclaimed": "noonewouldeverusethis77777"
},
"AngelList": {
"errorType": "status_code",
"rank": 5767,

View file

@ -1,89 +0,0 @@
import csv
import requests
import time
from collections import namedtuple
from colorama import Fore, Style
def load_proxies_from_csv(path_to_list):
"""
A function which loads proxies from a .csv file, to a list.
Inputs: path to .csv file which contains proxies, described by fields: 'ip', 'port', 'protocol'.
Outputs: list containing proxies stored in named tuples.
"""
Proxy = namedtuple('Proxy', ['ip', 'port', 'protocol'])
with open(path_to_list, 'r') as csv_file:
csv_reader = csv.DictReader(csv_file)
proxies = [Proxy(line['ip'],line['port'],line['protocol']) for line in csv_reader]
return proxies
def check_proxy(proxy_ip, proxy_port, protocol):
"""
A function which test the proxy by attempting
to make a request to the designated website.
We use 'wikipedia.org' as a test, since we can test the proxy anonymity
by check if the returning 'X-Client-IP' header matches the proxy ip.
"""
full_proxy = f'{protocol}://{proxy_ip}:{proxy_port}'
proxies = {'http': full_proxy, 'https': full_proxy}
try:
r = requests.get('https://www.wikipedia.org',proxies=proxies, timeout=4)
return_proxy = r.headers['X-Client-IP']
if proxy_ip==return_proxy:
return True
else:
return False
except Exception:
return False
def check_proxy_list(proxy_list, max_proxies=None):
"""
A function which takes in one mandatory argument -> a proxy list in
the format returned by the function 'load_proxies_from_csv'.
It also takes an optional argument 'max_proxies', if the user wishes to
cap the number of validated proxies.
Each proxy is tested by the check_proxy function. Since each test is done on
'wikipedia.org', in order to be considerate to Wikipedia servers, we are not using any async modules,
but are sending successive requests each separated by at least 1 sec.
Outputs: list containing proxies stored in named tuples.
"""
print((Style.BRIGHT + Fore.GREEN + "[" +
Fore.YELLOW + "*" +
Fore.GREEN + "] Started checking proxies."))
working_proxies = []
# If the user has limited the number of proxies we need,
# the function will stop when the working_proxies
# loads the max number of requested proxies.
if max_proxies != None:
for proxy in proxy_list:
if len(working_proxies) < max_proxies:
time.sleep(1)
if check_proxy(proxy.ip,proxy.port,proxy.protocol) == True:
working_proxies.append(proxy)
else:
break
else:
for proxy in proxy_list:
time.sleep(1)
if check_proxy(proxy.ip,proxy.port,proxy.protocol) == True:
working_proxies.append(proxy)
if len(working_proxies) > 0:
print((Style.BRIGHT + Fore.GREEN + "[" +
Fore.YELLOW + "*" +
Fore.GREEN + "] Finished checking proxies."))
return working_proxies
else:
raise Exception("Found no working proxies.")

View file

@ -6,6 +6,29 @@ They are listed here in the hope that things may change in the future
so they may be re-included.
## AdobeForums
As of 2020-04-12, all usernames are reported as available.
When I went to the site to see what was going on, usernames that I know
existed were redirecting to the main page.
I was able to see user profiles without logging in, but the URL was not
related to their user name. For example, user "tomke" went to
https://community.adobe.com/t5/user/viewprofilepage/user-id/10882613.
This can be detected, but it requires a different detection method.
```
"AdobeForums": {
"errorType": "status_code",
"rank": 59,
"url": "https://forums.adobe.com/people/{}",
"urlMain": "https://forums.adobe.com/",
"username_claimed": "jack",
"username_unclaimed": "noonewouldeverusethis77777"
},
```
## Basecamp
As of 2020-02-23, all usernames are reported as not existing.
@ -423,7 +446,7 @@ exists or not.
```
## InsaneJournal
## InsaneJournal
As of 2020-02-23, InsaneJournal returns false positive, when providing a username which contains a period.
Since we were not able to find the critera for a valid username, the best thing to do now is to remove it.

5
sherlock/__init__.py Normal file
View file

@ -0,0 +1,5 @@
"""Sherlock Module
This module contains the main logic to search for usernames at social
networks.
"""

14
sherlock/__main__.py Normal file
View file

@ -0,0 +1,14 @@
#! /usr/bin/env python3
"""
Sherlock: Find Usernames Across Social Networks Module
This module contains the main logic to search for usernames at social
networks.
"""
import sherlock
if __name__ == "__main__":
sherlock.main()

249
sherlock/notify.py Normal file
View file

@ -0,0 +1,249 @@
"""Sherlock Notify Module
This module defines the objects for notifying the caller about the
results of queries.
"""
from result import QueryStatus
from colorama import Fore, Style, init
class QueryNotify():
"""Query Notify Object.
Base class that describes methods available to notify the results of
a query.
It is intended that other classes inherit from this base class and
override the methods to implement specific functionality.
"""
def __init__(self, result=None):
"""Create Query Notify Object.
Contains information about a specific method of notifying the results
of a query.
Keyword Arguments:
self -- This object.
result -- Object of type QueryResult() containing
results for this query.
Return Value:
Nothing.
"""
self.result = result
return
def start(self, message=None):
"""Notify Start.
Notify method for start of query. This method will be called before
any queries are performed. This method will typically be
overridden by higher level classes that will inherit from it.
Keyword Arguments:
self -- This object.
message -- Object that is used to give context to start
of query.
Default is None.
Return Value:
Nothing.
"""
return
def update(self, result):
"""Notify Update.
Notify method for query result. This method will typically be
overridden by higher level classes that will inherit from it.
Keyword Arguments:
self -- This object.
result -- Object of type QueryResult() containing
results for this query.
Return Value:
Nothing.
"""
self.result = result
return
def finish(self, message=None):
"""Notify Finish.
Notify method for finish of query. This method will be called after
all queries have been performed. This method will typically be
overridden by higher level classes that will inherit from it.
Keyword Arguments:
self -- This object.
message -- Object that is used to give context to start
of query.
Default is None.
Return Value:
Nothing.
"""
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
result = str(self.result)
return result
class QueryNotifyPrint(QueryNotify):
"""Query Notify Print Object.
Query notify class that prints results.
"""
def __init__(self, result=None, verbose=False, print_found_only=False,
color=True):
"""Create Query Notify Print Object.
Contains information about a specific method of notifying the results
of a query.
Keyword Arguments:
self -- This object.
result -- Object of type QueryResult() containing
results for this query.
verbose -- Boolean indicating whether to give verbose output.
print_found_only -- Boolean indicating whether to only print found sites.
color -- Boolean indicating whether to color terminal output
Return Value:
Nothing.
"""
# Colorama module's initialization.
init(autoreset=True)
super().__init__(result)
self.verbose = verbose
self.print_found_only = print_found_only
self.color = color
return
def start(self, message):
"""Notify Start.
Will print the title to the standard output.
Keyword Arguments:
self -- This object.
message -- String containing username that the series
of queries are about.
Return Value:
Nothing.
"""
title = "Checking username"
if self.color:
print(Style.BRIGHT + Fore.GREEN + "[" +
Fore.YELLOW + "*" +
Fore.GREEN + f"] {title}" +
Fore.WHITE + f" {message}" +
Fore.GREEN + " on:")
else:
print(f"[*] {title} {message} on:")
return
def update(self, result):
"""Notify Update.
Will print the query result to the standard output.
Keyword Arguments:
self -- This object.
result -- Object of type QueryResult() containing
results for this query.
Return Value:
Nothing.
"""
self.result = result
if self.verbose == False or self.result.query_time is None:
response_time_text = ""
else:
response_time_text = f" [{round(self.result.query_time * 1000)} ms]"
#Output to the terminal is desired.
if result.status == QueryStatus.CLAIMED:
if self.color:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.GREEN + "+" +
Fore.WHITE + "]" +
response_time_text +
Fore.GREEN +
f" {self.result.site_name}: {self.result.site_url_user}"))
else:
print(f"[+]{response_time_text} {self.result.site_name}: {self.result.site_url_user}")
elif result.status == QueryStatus.AVAILABLE:
if not self.print_found_only:
if self.color:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
response_time_text +
Fore.GREEN + f" {self.result.site_name}:" +
Fore.YELLOW + " Not Found!"))
else:
print(f"[-]{response_time_text} {self.result.site_name}: Not Found!")
elif result.status == QueryStatus.UNKNOWN:
if self.color:
print(Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.GREEN + f" {self.result.site_name}:" +
Fore.RED + f" {self.result.context}" +
Fore.YELLOW + f" ")
else:
print(f"[-] {self.result.site_name}: {self.result.context} ")
elif result.status == QueryStatus.ILLEGAL:
if self.print_found_only == False:
msg = "Illegal Username Format For This Site!"
if self.color:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.GREEN + f" {self.result.site_name}:" +
Fore.YELLOW + f" {msg}"))
else:
print(f"[-] {self.result.site_name} {msg}")
else:
#It should be impossible to ever get here...
raise ValueError(f"Unknown Query Status '{str(result.status)}' for "
f"site '{self.result.site_name}'")
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
result = str(self.result)
return result

View file

@ -66,14 +66,6 @@
"username_claimed": "blue",
"username_unclaimed": "noonewouldeverusethis7"
},
"AdobeForums": {
"errorType": "status_code",
"rank": 59,
"url": "https://forums.adobe.com/people/{}",
"urlMain": "https://forums.adobe.com/",
"username_claimed": "jack",
"username_unclaimed": "noonewouldeverusethis77777"
},
"Alik.cz": {
"errorType": "status_code",
"rank": 624805,
@ -2380,7 +2372,7 @@
"url": "https://www.metacritic.com/user/{}",
"urlMain": "https://www.metacritic.com/",
"username_claimed": "blue",
"username_unclaimed": "noneownsthisusername"
"username_unclaimed": "noonewould"
},
"mixer.com": {
"errorType": "status_code",

88
sherlock/result.py Normal file
View file

@ -0,0 +1,88 @@
"""Sherlock Result Module
This module defines various objects for recording the results of queries.
"""
from enum import Enum
class QueryStatus(Enum):
"""Query Status Enumeration.
Describes status of query about a given username.
"""
CLAIMED = "Claimed" #Username Detected
AVAILABLE = "Available" #Username Not Detected
UNKNOWN = "Unknown" #Error Occurred While Trying To Detect Username
ILLEGAL = "Illegal" #Username Not Allowable For This Site
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
return self.value
class QueryResult():
"""Query Result Object.
Describes result of query about a given username.
"""
def __init__(self, username, site_name, site_url_user, status,
query_time=None, context=None):
"""Create Query Result Object.
Contains information about a specific method of detecting usernames on
a given type of web sites.
Keyword Arguments:
self -- This object.
username -- String indicating username that query result
was about.
site_name -- String which identifies site.
site_url_user -- String containing URL for username on site.
NOTE: The site may or may not exist: this
just indicates what the name would
be, if it existed.
status -- Enumeration of type QueryStatus() indicating
the status of the query.
query_time -- Time (in seconds) required to perform query.
Default of None.
context -- String indicating any additional context
about the query. For example, if there was
an error, this might indicate the type of
error that occurred.
Default of None.
Return Value:
Nothing.
"""
self.username = username
self.site_name = site_name
self.site_url_user = site_url_user
self.status = status
self.query_time = query_time
self.context = context
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
status = str(self.status)
if self.context is not None:
#There is extra context information available about the results.
#Append it to the normal response text.
status += f" ({self.context})"
return status

483
sherlock.py → sherlock/sherlock.py Executable file → Normal file
View file

@ -13,144 +13,125 @@ import os
import platform
import re
import sys
import random
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from time import monotonic
from concurrent.futures import ThreadPoolExecutor
from time import time
import webbrowser
import requests
from colorama import Fore, Style, init
from requests_futures.sessions import FuturesSession
from torrequest import TorRequest
from load_proxies import load_proxies_from_csv, check_proxy_list
from result import QueryStatus
from result import QueryResult
from notify import QueryNotify
from notify import QueryNotifyPrint
from sites import SitesInformation
module_name = "Sherlock: Find Usernames Across Social Networks"
__version__ = "0.10.9"
__version__ = "0.11.0"
global proxy_list
proxy_list = []
class ElapsedFuturesSession(FuturesSession):
"""
Extends FutureSession to add a response time metric to each request.
This is taken (almost) directly from here: https://github.com/ross/requests-futures#working-in-the-background
"""
class SherlockFuturesSession(FuturesSession):
def request(self, method, url, hooks={}, *args, **kwargs):
start = time()
"""Request URL.
def timing(r, *args, **kwargs):
elapsed_sec = time() - start
r.elapsed = round(elapsed_sec * 1000)
This extends the FuturesSession request method to calculate a response
time metric to each request.
It is taken (almost) directly from the following StackOverflow answer:
https://github.com/ross/requests-futures#working-in-the-background
Keyword Arguments:
self -- This object.
method -- String containing method desired for request.
url -- String containing URL for request.
hooks -- Dictionary containing hooks to execute after
request finishes.
args -- Arguments.
kwargs -- Keyword arguments.
Return Value:
Request object.
"""
#Record the start time for the request.
start = monotonic()
def response_time(resp, *args, **kwargs):
"""Response Time Hook.
Keyword Arguments:
resp -- Response object.
args -- Arguments.
kwargs -- Keyword arguments.
Return Value:
N/A
"""
resp.elapsed = monotonic() - start
return
#Install hook to execute when response completes.
#Make sure that the time measurement hook is first, so we will not
#track any later hook's execution time.
try:
if isinstance(hooks['response'], (list, tuple)):
# needs to be first so we don't time other hooks execution
hooks['response'].insert(0, timing)
if isinstance(hooks['response'], list):
hooks['response'].insert(0, response_time)
elif isinstance(hooks['response'], tuple):
#Convert tuple to list and insert time measurement hook first.
hooks['response'] = list(hooks['response'])
hooks['response'].insert(0, response_time)
else:
hooks['response'] = [timing, hooks['response']]
#Must have previously contained a single hook function,
#so convert to list.
hooks['response'] = [response_time, hooks['response']]
except KeyError:
hooks['response'] = timing
#No response hook was already defined, so install it ourselves.
hooks['response'] = [response_time]
return super(ElapsedFuturesSession, self).request(method, url, hooks=hooks, *args, **kwargs)
return super(SherlockFuturesSession, self).request(method,
url,
hooks=hooks,
*args, **kwargs)
def print_info(title, info, color=True):
if color:
print(Style.BRIGHT + Fore.GREEN + "[" +
Fore.YELLOW + "*" +
Fore.GREEN + f"] {title}" +
Fore.WHITE + f" {info}" +
Fore.GREEN + " on:")
else:
print(f"[*] {title} {info} on:")
def get_response(request_future, error_type, social_network):
def print_error(err, errstr, var, verbose=False, color=True):
if color:
print(Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.RED + f" {errstr}" +
Fore.YELLOW + f" {err if verbose else var}")
else:
print(f"[-] {errstr} {err if verbose else var}")
def format_response_time(response_time, verbose):
return " [{} ms]".format(response_time) if verbose else ""
def print_found(social_network, url, response_time, verbose=False, color=True):
if color:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.GREEN + "+" +
Fore.WHITE + "]" +
format_response_time(response_time, verbose) +
Fore.GREEN + f" {social_network}:"), url)
else:
print(f"[+]{format_response_time(response_time, verbose)} {social_network}: {url}")
def print_not_found(social_network, response_time, verbose=False, color=True):
if color:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
format_response_time(response_time, verbose) +
Fore.GREEN + f" {social_network}:" +
Fore.YELLOW + " Not Found!"))
else:
print(f"[-]{format_response_time(response_time, verbose)} {social_network}: Not Found!")
def print_invalid(social_network, msg, color=True):
"""Print invalid search result."""
if color:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.GREEN + f" {social_network}:" +
Fore.YELLOW + f" {msg}"))
else:
print(f"[-] {social_network} {msg}")
def get_response(request_future, error_type, social_network, verbose=False, retry_no=None, color=True):
global proxy_list
#Default for Response object if some failure occurs.
response = None
error_context = "General Unknown Error"
expection_text = None
try:
rsp = request_future.result()
if rsp.status_code:
return rsp, error_type, rsp.elapsed
response = request_future.result()
if response.status_code:
#status code exists in response object
error_context = None
except requests.exceptions.HTTPError as errh:
print_error(errh, "HTTP Error:", social_network, verbose, color)
# In case our proxy fails, we retry with another proxy.
error_context = "HTTP Error"
expection_text = str(errh)
except requests.exceptions.ProxyError as errp:
if retry_no>0 and len(proxy_list)>0:
#Selecting the new proxy.
new_proxy = random.choice(proxy_list)
new_proxy = f'{new_proxy.protocol}://{new_proxy.ip}:{new_proxy.port}'
print(f'Retrying with {new_proxy}')
request_future.proxy = {'http':new_proxy,'https':new_proxy}
get_response(request_future,error_type, social_network, verbose,retry_no=retry_no-1, color=color)
else:
print_error(errp, "Proxy error:", social_network, verbose, color)
error_context = "Proxy Error"
expection_text = str(errp)
except requests.exceptions.ConnectionError as errc:
print_error(errc, "Error Connecting:", social_network, verbose, color)
error_context = "Error Connecting"
expection_text = str(errc)
except requests.exceptions.Timeout as errt:
print_error(errt, "Timeout Error:", social_network, verbose, color)
error_context = "Timeout Error"
expection_text = str(errt)
except requests.exceptions.RequestException as err:
print_error(err, "Unknown error:", social_network, verbose, color)
return None, "", -1
error_context = "Unknown Error"
expection_text = str(err)
return response, error_context, expection_text
def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False,
proxy=None, print_found_only=False, timeout=None, color=True):
def sherlock(username, site_data, query_notify,
tor=False, unique_tor=False,
proxy=None, timeout=None):
"""Run Sherlock Analysis.
Checks for existence of username on various social media sites.
@ -159,13 +140,14 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False,
username -- String indicating username that report
should be created against.
site_data -- Dictionary containing all of the site data.
verbose -- Boolean indicating whether to give verbose output.
query_notify -- Object with base type of QueryNotify().
This will be used to notify the caller about
query results.
tor -- Boolean indicating whether to use a tor circuit for the requests.
unique_tor -- Boolean indicating whether to use a new tor circuit for each request.
proxy -- String indicating the proxy URL
timeout -- Time in seconds to wait before timing out request.
Default is no timeout.
color -- Boolean indicating whether to color terminal output
Return Value:
Dictionary containing results from report. Key of dictionary is the name
@ -173,13 +155,16 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False,
the following keys:
url_main: URL of main site.
url_user: URL of user on site (if account exists).
exists: String indicating results of test for account existence.
status: QueryResult() object indicating results of test for
account existence.
http_status: HTTP status code of query which checked for existence on
site.
response_text: Text that came back from request. May be None if
there was an HTTP error when checking for existence.
"""
print_info("Checking username", username, color)
#Notify caller that we are starting the query.
query_notify.start(username)
# Create session based on request methodology
if tor or unique_tor:
@ -199,8 +184,9 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False,
max_workers=len(site_data)
#Create multi-threaded session for all requests.
session = ElapsedFuturesSession(max_workers=max_workers,
session=underlying_session)
session = SherlockFuturesSession(max_workers=max_workers,
session=underlying_session)
# Results from analysis of all sites
results_total = {}
@ -224,21 +210,23 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False,
# Override/append any extra headers required by a given site.
headers.update(net_info["headers"])
# URL of user on site (if it exists)
url = net_info["url"].format(username)
# Don't make request if username is invalid for the site
regex_check = net_info.get("regexCheck")
if regex_check and re.search(regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
if not print_found_only:
print_invalid(social_network, "Illegal Username Format For This Site!", color)
results_site["exists"] = "illegal"
results_site['status'] = QueryResult(username,
social_network,
url,
QueryStatus.ILLEGAL)
results_site["url_user"] = ""
results_site['http_status'] = ""
results_site['response_text'] = ""
results_site['response_time_ms'] = ""
query_notify.update(results_site['status'])
else:
# URL of user on site (if it exists)
url = net_info["url"].format(username)
results_site["url_user"] = url
url_probe = net_info.get("urlProbe")
if url_probe is None:
@ -298,58 +286,72 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False,
# Retrieve other site information again
url = results_site.get("url_user")
exists = results_site.get("exists")
if exists is not None:
status = results_site.get("status")
if status is not None:
# We have already determined the user doesn't exist here
continue
# Get the expected error type
error_type = net_info["errorType"]
# Default data in case there are any failures in doing a request.
http_status = "?"
response_text = ""
# Retrieve future and ensure it has finished
future = net_info["request_future"]
r, error_type, response_time = get_response(request_future=future,
error_type=error_type,
social_network=social_network,
verbose=verbose,
retry_no=3,
color=color)
r, error_text, expection_text = get_response(request_future=future,
error_type=error_type,
social_network=social_network)
#Get response time for response of our request.
try:
response_time = r.elapsed
except AttributeError:
response_time = None
# Attempt to get request information
try:
http_status = r.status_code
except:
pass
http_status = "?"
try:
response_text = r.text.encode(r.encoding)
except:
pass
response_text = ""
if error_type == "message":
if error_text is not None:
result = QueryResult(username,
social_network,
url,
QueryStatus.UNKNOWN,
query_time=response_time,
context=error_text)
elif error_type == "message":
error = net_info.get("errorMsg")
# Checks if the error message is in the HTML
if not error in r.text:
print_found(social_network, url, response_time, verbose, color)
exists = "yes"
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
if not print_found_only:
print_not_found(social_network, response_time, verbose, color)
exists = "no"
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
elif error_type == "status_code":
# Checks if the status code of the response is 2XX
if not r.status_code >= 300 or r.status_code < 200:
print_found(social_network, url, response_time, verbose, color)
exists = "yes"
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
if not print_found_only:
print_not_found(social_network, response_time, verbose, color)
exists = "no"
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
elif error_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
@ -357,29 +359,39 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False,
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= r.status_code < 300:
#
print_found(social_network, url, response_time, verbose, color)
exists = "yes"
result = QueryResult(username,
social_network,
url,
QueryStatus.CLAIMED,
query_time=response_time)
else:
if not print_found_only:
print_not_found(social_network, response_time, verbose, color)
exists = "no"
result = QueryResult(username,
social_network,
url,
QueryStatus.AVAILABLE,
query_time=response_time)
else:
#It should be impossible to ever get here...
raise ValueError(f"Unknown Error Type '{error_type}' for "
f"site '{social_network}'")
elif error_type == "":
if not print_found_only:
print_invalid(social_network, "Error!", color)
exists = "error"
# Save exists flag
results_site['exists'] = exists
#Notify caller about results of query.
query_notify.update(result)
# Save status of request
results_site['status'] = result
# Save results from request
results_site['http_status'] = http_status
results_site['response_text'] = response_text
results_site['response_time_ms'] = response_time
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
#Notify caller that all queries are finished.
query_notify.finish()
return results_total
@ -409,8 +421,6 @@ def timeout_check(value):
def main():
# Colorama module's initialization.
init(autoreset=True)
version_string = f"%(prog)s {__version__}\n" + \
f"{requests.__description__}: {requests.__version__}\n" + \
@ -456,18 +466,8 @@ def main():
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080"
)
parser.add_argument("--json", "-j", metavar="JSON_FILE",
dest="json_file", default="data.json",
dest="json_file", default=None,
help="Load data from a JSON file or an online, valid, JSON file.")
parser.add_argument("--proxy_list", "-pl", metavar='PROXY_LIST',
action="store", dest="proxy_list", default=None,
help="Make requests over a proxy randomly chosen from a list generated from a .csv file."
)
parser.add_argument("--check_proxies", "-cp", metavar='CHECK_PROXY',
action="store", dest="check_prox", default=None,
help="To be used with the '--proxy_list' parameter. "
"The script will check if the proxies supplied in the .csv file are working and anonymous."
"Put 0 for no limit on successfully checked proxies, or another number to institute a limit."
)
parser.add_argument("--timeout",
action="store", metavar='TIMEOUT',
dest="timeout", type=timeout_check, default=None,
@ -498,40 +498,13 @@ def main():
# Argument check
# TODO regex check on args.proxy
if args.tor and (args.proxy != None or args.proxy_list != None):
raise Exception("Tor and Proxy cannot be set in the meantime.")
# Proxy argument check.
# Does not necessarily need to throw an error,
# since we could join the single proxy with the ones generated from the .csv,
# but it seems unnecessarily complex at this time.
if args.proxy != None and args.proxy_list != None:
raise Exception("A single proxy cannot be used along with proxy list.")
if args.tor and (args.proxy != None):
raise Exception("Tor and Proxy cannot be set at the same time.")
# Make prompts
if args.proxy != None:
print("Using the proxy: " + args.proxy)
global proxy_list
if args.proxy_list != None:
print_info("Loading proxies from", args.proxy_list, not args.color)
proxy_list = load_proxies_from_csv(args.proxy_list)
# Checking if proxies should be checked for anonymity.
if args.check_prox != None and args.proxy_list != None:
try:
limit = int(args.check_prox)
if limit == 0:
proxy_list = check_proxy_list(proxy_list)
elif limit > 0:
proxy_list = check_proxy_list(proxy_list, limit)
else:
raise ValueError
except ValueError:
raise Exception("Parameter --check_proxies/-cp must be a positive integer.")
if args.tor or args.unique_tor:
print("Using Tor to make requests")
print("Warning: some websites might refuse connecting over Tor, so note that using this option might increase connection errors.")
@ -546,41 +519,20 @@ def main():
print("You can only use --output with a single username")
sys.exit(1)
response_json_online = None
site_data_all = None
# Try to load json from website.
#Create object with all information about sites we are aware of.
try:
response_json_online = requests.get(url=args.json_file)
except requests.exceptions.MissingSchema: # In case the schema is wrong it's because it may not be a website
pass
sites = SitesInformation(args.json_file)
except Exception as error:
print(f"ERROR: {error}")
sys.exit(1)
# Check if the response is appropriate.
if response_json_online is not None and response_json_online.status_code == 200:
# Since we got data from a website, try to load json and exit if parsing fails.
try:
site_data_all = response_json_online.json()
except ValueError:
print("Invalid JSON from website!")
sys.exit(1)
pass
data_file_path = os.path.join(os.path.dirname(
os.path.realpath(__file__)), args.json_file)
# This will be none if the request had a missing schema
if site_data_all is None:
# Check if the file exists otherwise exit.
if not os.path.exists(data_file_path):
print("JSON file doesn't exist.")
print(
"If this is not a file but a website, make sure you have appended http:// or https://.")
sys.exit(1)
else:
raw = open(data_file_path, "r", encoding="utf-8")
try:
site_data_all = json.load(raw)
except:
print("Invalid JSON loaded from file.")
#Create original dictionary from SitesInformation() object.
#Eventually, the rest of the code will be updated to use the new object
#directly, but this will glue the two pieces together.
site_data_all = {}
for site in sites:
site_data_all[site.name] = site.information
if args.site_list is None:
# Not desired to look at a sub-set of sites
@ -612,49 +564,43 @@ def main():
for site in ranked_sites:
site_data[site] = site_dataCpy.get(site)
#Create notify object for query results.
query_notify = QueryNotifyPrint(result=None,
verbose=args.verbose,
print_found_only=args.print_found_only,
color=not args.no_color)
# Run report on all specified users.
for username in args.username:
print()
if args.output:
file = open(args.output, "w", encoding="utf-8")
elif args.folderoutput: # In case we handle multiple usernames at a targetted folder.
# If the folder doesnt exist, create it first
if not os.path.isdir(args.folderoutput):
os.mkdir(args.folderoutput)
file = open(os.path.join(args.folderoutput,
username + ".txt"), "w", encoding="utf-8")
else:
file = open(username + ".txt", "w", encoding="utf-8")
# We try to ad a random member of the 'proxy_list' var as the proxy of the request.
# If we can't access the list or it is empty, we proceed with args.proxy as the proxy.
try:
random_proxy = random.choice(proxy_list)
proxy = f'{random_proxy.protocol}://{random_proxy.ip}:{random_proxy.port}'
except (NameError, IndexError):
proxy = args.proxy
results = sherlock(username,
site_data,
verbose=args.verbose,
query_notify,
tor=args.tor,
unique_tor=args.unique_tor,
proxy=args.proxy,
print_found_only=args.print_found_only,
timeout=args.timeout,
color=not args.no_color)
timeout=args.timeout)
exists_counter = 0
for website_name in results:
dictionary = results[website_name]
if dictionary.get("exists") == "yes":
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
if args.browse :
webbrowser.open(dictionary["url_user"])
file.write(f"Total Websites Username Detected On : {exists_counter}")
file.close()
if args.output:
result_file = args.output
elif args.folderoutput:
# The usernames results should be stored in a targeted folder.
# If the folder doesn't exist, create it first
os.makedirs(args.folderoutput, exist_ok=True)
result_file = os.path.join(args.folderoutput, f"{username}.txt")
else:
result_file = f"{username}.txt"
with open(result_file, "w", encoding="utf-8") as file:
exists_counter = 0
for website_name in results:
dictionary = results[website_name]
if dictionary.get("status").status == QueryStatus.CLAIMED:
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
file.write(f"Total Websites Username Detected On : {exists_counter}")
if args.csv == True:
with open(username + ".csv", "w", newline='', encoding="utf-8") as csv_report:
@ -665,17 +611,20 @@ def main():
'url_user',
'exists',
'http_status',
'response_time_ms'
'response_time_s'
]
)
for site in results:
response_time_s = results[site]['status'].query_time
if response_time_s is None:
response_time_s = ""
writer.writerow([username,
site,
results[site]['url_main'],
results[site]['url_user'],
results[site]['exists'],
str(results[site]['status'].status),
results[site]['http_status'],
results[site]['response_time_ms']
response_time_s
]
)

247
sherlock/sites.py Normal file
View file

@ -0,0 +1,247 @@
"""Sherlock Sites Information Module
This module supports storing information about web sites.
This is the raw data that will be used to search for usernames.
"""
import logging
import os
import json
import operator
import requests
import sys
class SiteInformation():
def __init__(self, name, url_home, url_username_format, popularity_rank,
username_claimed, username_unclaimed,
information):
"""Create Site Information Object.
Contains information about a specific web site.
Keyword Arguments:
self -- This object.
name -- String which identifies site.
url_home -- String containing URL for home of site.
url_username_format -- String containing URL for Username format
on site.
NOTE: The string should contain the
token "{}" where the username should
be substituted. For example, a string
of "https://somesite.com/users/{}"
indicates that the individual
usernames would show up under the
"https://somesite.com/users/" area of
the web site.
popularity_rank -- Integer indicating popularity of site.
In general, smaller numbers mean more
popular ("0" or None means ranking
information not available).
username_claimed -- String containing username which is known
to be claimed on web site.
username_unclaimed -- String containing username which is known
to be unclaimed on web site.
information -- Dictionary containing all known information
about web site.
NOTE: Custom information about how to
actually detect the existence of the
username will be included in this
dictionary. This information will
be needed by the detection method,
but it is only recorded in this
object for future use.
Return Value:
Nothing.
"""
self.name = name
self.url_home = url_home
self.url_username_format = url_username_format
if (popularity_rank is None) or (popularity_rank == 0):
#We do not know the popularity, so make site go to bottom of list.
popularity_rank = sys.maxsize
self.popularity_rank = popularity_rank
self.username_claimed = username_claimed
self.username_unclaimed = username_unclaimed
self.information = information
return
def __str__(self):
"""Convert Object To String.
Keyword Arguments:
self -- This object.
Return Value:
Nicely formatted string to get information about this object.
"""
return f"{self.name} ({self.url_home})"
class SitesInformation():
def __init__(self, data_file_path=None):
"""Create Sites Information Object.
Contains information about all supported web sites.
Keyword Arguments:
self -- This object.
data_file_path -- String which indicates path to data file.
The file name must end in ".json".
There are 3 possible formats:
* Absolute File Format
For example, "c:/stuff/data.json".
* Relative File Format
The current working directory is used
as the context.
For example, "data.json".
* URL Format
For example,
"https://example.com/data.json", or
"http://example.com/data.json".
An exception will be thrown if the path
to the data file is not in the expected
format, or if there was any problem loading
the file.
If this option is not specified, then a
default site list will be used.
Return Value:
Nothing.
"""
if data_file_path is None:
#Use internal default.
data_file_path = \
os.path.join(os.path.dirname(os.path.realpath(__file__)),
"resources/data.json"
)
#Ensure that specified data file has correct extension.
if ".json" != data_file_path[-5:].lower():
raise FileNotFoundError(f"Incorrect JSON file extension for "
f"data file '{data_file_path}'."
)
if ( ("http://" == data_file_path[:7].lower()) or
("https://" == data_file_path[:8].lower())
):
#Reference is to a URL.
try:
response = requests.get(url=data_file_path)
except Exception as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file URL '{data_file_path}': "
f"{str(error)}"
)
if response.status_code == 200:
try:
site_data = response.json()
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
else:
raise FileNotFoundError(f"Bad response while accessing "
f"data file URL '{data_file_path}'."
)
else:
#Reference is to a file.
try:
with open(data_file_path, "r", encoding="utf-8") as file:
try:
site_data = json.load(file)
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
except FileNotFoundError as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file '{data_file_path}'."
)
self.sites = {}
#Add all of site information from the json file to internal site list.
for site_name in site_data:
try:
#If popularity unknown, make site be at bottom of list.
popularity_rank = site_data[site_name].get("rank", sys.maxsize)
self.sites[site_name] = \
SiteInformation(site_name,
site_data[site_name]["urlMain"],
site_data[site_name]["url"],
popularity_rank,
site_data[site_name]["username_claimed"],
site_data[site_name]["username_unclaimed"],
site_data[site_name]
)
except KeyError as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': "
f"Missing attribute {str(error)}."
)
return
def site_name_list(self, popularity_rank=False):
"""Get Site Name List.
Keyword Arguments:
self -- This object.
popularity_rank -- Boolean indicating if list should be sorted
by popularity rank.
Default value is False.
NOTE: List is sorted in ascending
alphabetical order is popularity rank
is not requested.
Return Value:
List of strings containing names of sites.
"""
if popularity_rank == True:
#Sort in ascending popularity rank order.
site_rank_name = \
sorted([(site.popularity_rank,site.name) for site in self],
key=operator.itemgetter(0)
)
site_names = [name for _,name in site_rank_name]
else:
#Sort in ascending alphabetical order.
site_names = sorted([site.name for site in self], key=str.lower)
return site_names
def __iter__(self):
"""Iterator For Object.
Keyword Arguments:
self -- This object.
Return Value:
Iterator for sites object.
"""
for site_name in self.sites:
yield self.sites[site_name]
def __len__(self):
"""Length For Object.
Keyword Arguments:
self -- This object.
Return Value:
Length of sites object.
"""
return len(self.sites)

View file

@ -21,7 +21,7 @@ class SherlockDetectTests(SherlockBaseTest):
Will trigger an assert if detection mechanism did not work as expected.
"""
site = 'Instagram'
site = 'Instructables'
site_data = self.site_data_all[site]
#Ensure that the site's detection method has not changed.
@ -48,7 +48,7 @@ class SherlockDetectTests(SherlockBaseTest):
Will trigger an assert if detection mechanism did not work as expected.
"""
site = 'Instagram'
site = 'Instructables'
site_data = self.site_data_all[site]
#Ensure that the site's detection method has not changed.

View file

@ -7,6 +7,10 @@ import os
import os.path
import unittest
import sherlock
from result import QueryStatus
from result import QueryResult
from notify import QueryNotify
from sites import SitesInformation
import warnings
@ -27,10 +31,16 @@ class SherlockBaseTest(unittest.TestCase):
#TODO: Figure out how to fix the code so this is not needed.
warnings.simplefilter("ignore", ResourceWarning)
# Load the data file with all site information.
data_file_path = os.path.join(os.path.dirname(os.path.realpath(sherlock.__file__)), "data.json")
with open(data_file_path, "r", encoding="utf-8") as raw:
self.site_data_all = json.load(raw)
#Create object with all information about sites we are aware of.
sites = SitesInformation()
#Create original dictionary from SitesInformation() object.
#Eventually, the rest of the code will be updated to use the new object
#directly, but this will glue the two pieces together.
site_data_all = {}
for site in sites:
site_data_all[site.name] = site.information
self.site_data_all = site_data_all
# Load excluded sites list, if any
excluded_sites_path = os.path.join(os.path.dirname(os.path.realpath(sherlock.__file__)), "tests/.excluded_sites")
@ -40,10 +50,13 @@ class SherlockBaseTest(unittest.TestCase):
except FileNotFoundError:
self.excluded_sites = []
self.verbose=False
#Create notify object for query results.
self.query_notify = QueryNotify()
self.tor=False
self.unique_tor=False
self.timeout=None
self.skip_error_sites=True
return
@ -94,16 +107,16 @@ class SherlockBaseTest(unittest.TestCase):
site_data = self.site_data_filter(site_list)
if exist_check:
check_type_text = "exists"
exist_result_desired = "yes"
check_type_text = "claimed"
exist_result_desired = QueryStatus.CLAIMED
else:
check_type_text = "does not exist"
exist_result_desired = "no"
check_type_text = "available"
exist_result_desired = QueryStatus.AVAILABLE
for username in username_list:
results = sherlock.sherlock(username,
site_data,
verbose=self.verbose,
self.query_notify,
tor=self.tor,
unique_tor=self.unique_tor,
timeout=self.timeout
@ -112,7 +125,18 @@ class SherlockBaseTest(unittest.TestCase):
with self.subTest(f"Checking Username '{username}' "
f"{check_type_text} on Site '{site}'"
):
self.assertEqual(result['exists'], exist_result_desired)
if (
(self.skip_error_sites == True) and
(result['status'].status == QueryStatus.UNKNOWN)
):
#Some error connecting to site.
self.skipTest(f"Skipping Username '{username}' "
f"{check_type_text} on Site '{site}': "
f"Site returned error status."
)
self.assertEqual(exist_result_desired,
result['status'].status)
return

View file

@ -35,7 +35,7 @@ parser.add_argument("--rank","-r",
)
args = parser.parse_args()
with open("data.json", "r", encoding="utf-8") as data_file:
with open("sherlock/resources/data.json", "r", encoding="utf-8") as data_file:
data = json.load(data_file)
with open("sites.md", "w") as site_file: