support for ipinfo and ip to location ordering

This commit is contained in:
Emil Lerch 2020-07-19 11:57:18 -07:00
parent f87a825d8b
commit 3c16ac7993
No known key found for this signature in database
GPG key ID: A7B62D657EF764F8
2 changed files with 86 additions and 32 deletions

View file

@ -83,6 +83,11 @@ PLAIN_TEXT_AGENTS = [
PLAIN_TEXT_PAGES = [':help', ':bash.function', ':translation', ':iterm2'] PLAIN_TEXT_PAGES = [':help', ':bash.function', ':translation', ':iterm2']
_IPLOCATION_ORDER = os.environ.get(
"WTTR_IPLOCATION_ORDER",
'geoip,ip2location,ipinfo')
IPLOCATION_ORDER = _IPLOCATION_ORDER.split(',')
_IP2LOCATION_KEY_FILE = os.environ.get( _IP2LOCATION_KEY_FILE = os.environ.get(
"WTTR_IP2LOCATION_KEY_FILE", "WTTR_IP2LOCATION_KEY_FILE",
os.environ['HOME'] + '/.ip2location.key') os.environ['HOME'] + '/.ip2location.key')
@ -90,6 +95,13 @@ IP2LOCATION_KEY = None
if os.path.exists(_IP2LOCATION_KEY_FILE): if os.path.exists(_IP2LOCATION_KEY_FILE):
IP2LOCATION_KEY = open(_IP2LOCATION_KEY_FILE, 'r').read().strip() IP2LOCATION_KEY = open(_IP2LOCATION_KEY_FILE, 'r').read().strip()
_IPINFO_KEY_FILE = os.environ.get(
"WTTR_IPINFO_KEY_FILE",
os.environ['HOME'] + '/.ipinfo.key')
IPINFO_TOKEN = None
if os.path.exists(_IPINFO_KEY_FILE):
IPINFO_TOKEN = open(_IPINFO_KEY_FILE, 'r').read().strip()
_WWO_KEY_FILE = os.environ.get( _WWO_KEY_FILE = os.environ.get(
"WTTR_WWO_KEY_FILE", "WTTR_WWO_KEY_FILE",
os.environ['HOME'] + '/.wwo.key') os.environ['HOME'] + '/.wwo.key')

View file

@ -17,7 +17,7 @@ import requests
import geoip2.database import geoip2.database
from globals import GEOLITE, GEOLOCATOR_SERVICE, IP2LCACHE, IP2LOCATION_KEY, NOT_FOUND_LOCATION, \ from globals import GEOLITE, GEOLOCATOR_SERVICE, IP2LCACHE, IP2LOCATION_KEY, NOT_FOUND_LOCATION, \
ALIASES, BLACKLIST, IATA_CODES_FILE ALIASES, BLACKLIST, IATA_CODES_FILE, IPLOCATION_ORDER, IPINFO_TOKEN
GEOIP_READER = geoip2.database.Reader(GEOLITE) GEOIP_READER = geoip2.database.Reader(GEOLITE)
@ -88,9 +88,15 @@ def geolocator(location):
return None return None
def ip2location(ip_addr):
"Convert IP address `ip_addr` to a location name"
def ipcachewrite(ip_addr, location):
cached = os.path.join(IP2LCACHE, ip_addr)
if not os.path.exists(IP2LCACHE):
os.makedirs(IP2LCACHE)
with open(cached, 'w') as file:
file.write(location[0] + ';' + location[1])
def ipcache(ip_addr):
cached = os.path.join(IP2LCACHE, ip_addr) cached = os.path.join(IP2LCACHE, ip_addr)
if not os.path.exists(IP2LCACHE): if not os.path.exists(IP2LCACHE):
os.makedirs(IP2LCACHE) os.makedirs(IP2LCACHE)
@ -98,33 +104,51 @@ def ip2location(ip_addr):
location = None location = None
if os.path.exists(cached): if os.path.exists(cached):
location = open(cached, 'r').read() location = open(cached, 'r').read().split(';')
else: return location
# if IP2LOCATION_KEY is not set, do not the query,
# because the query wont be processed anyway def ip2location(ip_addr):
if IP2LOCATION_KEY: "Convert IP address `ip_addr` to a location name"
try:
ip2location_response = requests\ location = ipcache(ip_addr)
.get('http://api.ip2location.com/?ip=%s&key=%s&package=WS3' \ if location:
% (ip_addr, IP2LOCATION_KEY)).text return location
if ';' in ip2location_response:
open(cached, 'w').write(ip2location_response) # if IP2LOCATION_KEY is not set, do not the query,
location = ip2location_response # because the query wont be processed anyway
except requests.exceptions.ConnectionError: if IP2LOCATION_KEY:
pass try:
location = requests\
.get('http://api.ip2location.com/?ip=%s&key=%s&package=WS3' \
% (ip_addr, IP2LOCATION_KEY)).text
except requests.exceptions.ConnectionError:
pass
if location and ';' in location: if location and ';' in location:
location = location.split(';')[3], location.split(';')[1] location = location.split(';')[3], location.split(';')[1]
else: else:
location = location, None location = location, None
if location:
ipcachewrite(ip_addr, location)
return location return location
def get_location(ip_addr):
"""
Return location pair (CITY, COUNTRY) for `ip_addr`
"""
def ipinfo(ip_addr):
location = ipcache(ip_addr)
if location:
return location
if IPINFO_TOKEN:
r = requests.get('https://ipinfo.io/%s/json?token=%s' %
(ip_addr, IPINFO_TOKEN))
if r.status_code == 200:
location = r.json()["city"], r.json()["country"]
if location:
ipcachewrite(ip_addr, location)
return location
def geoip(ip_addr):
try: try:
response = GEOIP_READER.city(ip_addr) response = GEOIP_READER.city(ip_addr)
country = response.country.name country = response.country.name
@ -132,7 +156,34 @@ def get_location(ip_addr):
except geoip2.errors.AddressNotFoundError: except geoip2.errors.AddressNotFoundError:
country = None country = None
city = None city = None
return city, country
def workaround(city, country):
# workaround for the strange bug with the country name
# maybe some other countries has this problem too
#
# Having these in a separate function will help if this gets to
# be a problem
if country == 'Russian Federation':
country = 'Russia'
return city, country
def get_location(ip_addr):
"""
Return location pair (CITY, COUNTRY) for `ip_addr`
"""
for method in IPLOCATION_ORDER:
if method == 'geoip':
city, country = geoip(ip_addr)
elif method == 'ip2location':
city, country = ip2location(ip_addr)
elif method == 'ipinfo':
city, country = ipinfo(ip_addr)
else:
print("ERROR: invalid iplocation method speficied: %s" % method)
if city is not None:
city, country = workaround(city, country)
return city, country
# #
# temporary disabled it because of geoip services capcacity # temporary disabled it because of geoip services capcacity
# #
@ -143,18 +194,9 @@ def get_location(ip_addr):
# city = location.raw.get('address', {}).get('city') # city = location.raw.get('address', {}).get('city')
# except: # except:
# pass # pass
if city is None:
city, country = ip2location(ip_addr)
# workaround for the strange bug with the country name # No methods resulted in a location - return default
# maybe some other countries has this problem too return NOT_FOUND_LOCATION, None
if country == 'Russian Federation':
country = 'Russia'
if city:
return city, country
else:
return NOT_FOUND_LOCATION, None
def location_canonical_name(location): def location_canonical_name(location):