# vim: fileencoding=utf-8 # vim: foldmethod=marker foldenable: """ [X] emoji [ ] wego icon [ ] v2.wttr.in [X] astronomical (sunset) [X] time [X] frames [X] colorize rain data [ ] date + locales [X] wind color [ ] highlight current date [ ] bind to real site [ ] max values: temperature [X] max value: rain [ ] comment github [ ] commit """ import sys import re import math import json import datetime try: import StringIO except: import io as StringIO import requests import diagram import pyjq import pytz import numpy as np try: from astral import Astral, Location except ImportError: pass from scipy.interpolate import interp1d from babel.dates import format_datetime from globals import WWO_KEY import constants import translations import wttr_line if not sys.version_info >= (3, 0): reload(sys) # noqa: F821 sys.setdefaultencoding("utf-8") # data processing {{{ def get_data(config): """ Fetch data for `query_string` """ url = ( 'http://' 'localhost:5001/premium/v1/weather.ashx' '?key=%s' '&q=%s&format=json&num_of_days=3&tp=3&lang=None' ) % (WWO_KEY, config["location"]) text = requests.get(url).text parsed_data = json.loads(text) return parsed_data def interpolate_data(input_data, max_width): """ Resample `input_data` to number of `max_width` counts """ x = list(range(len(input_data))) y = input_data xvals = np.linspace(0, len(input_data)-1, max_width) yinterp = interp1d(x, y, kind='cubic') return yinterp(xvals) def jq_query(query, data_parsed): """ Apply `query` to structued data `data_parsed` """ pyjq_data = pyjq.all(query, data_parsed) data = map(float, pyjq_data) return data # }}} # utils {{{ def colorize(string, color_code): return "\033[%sm%s\033[0m" % (color_code, string) # }}} # draw_spark {{{ def draw_spark(data, height, width, color_data): """ Spark-style visualize `data` in a region `height` x `width` """ _BARS = u' _▁▂▃▄▅▇█' def _box(height, row, value, max_value): row_height = 1.0 * max_value / height if row_height * row >= value: return _BARS[0] if row_height * (row+1) <= value: return _BARS[-1] return _BARS[int(1.0*(value - row_height*row)/(row_height*1.0)*len(_BARS))] max_value = max(data) output = "" color_code = 20 for i in range(height): for j in range(width): character = _box(height, height-i-1, data[j], max_value) if data[j] != 0: chance_of_rain = color_data[j]/100.0 * 2 if chance_of_rain > 1: chance_of_rain = 1 color_index = int(5*chance_of_rain) color_code = 16 + color_index # int(math.floor((20-16) * 1.0 * (height-1-i)/height*(max_value/data[j]))) output += "\033[38;5;%sm%s\033[0m" % (color_code, character) output += "\n" # labeling max value if max_value == 0: max_line = " "*width else: max_line = "" for j in range(width): if data[j] == max_value: max_line = "%3.2fmm|%s%%" % (max_value, int(color_data[j])) orig_max_line = max_line # aligning it if len(max_line)/2 < j and len(max_line)/2 + j < width: spaces = " "*(j - len(max_line)/2) max_line = spaces + max_line # + spaces max_line = max_line + " "*(width - len(max_line)) elif len(max_line)/2 + j >= width: max_line = " "*(width - len(max_line)) + max_line max_line = max_line.replace(orig_max_line, colorize(orig_max_line, "38;5;33")) break if max_line: output = "\n" + max_line + "\n" + output + "\n" return output # }}} # draw_diagram {{{ def draw_diagram(data, height, width): option = diagram.DOption() option.size = diagram.Point([width, height]) option.mode = 'g' stream = StringIO.StringIO() gram = diagram.DGWrapper( data=[list(data), range(len(data))], dg_option=option, ostream=stream) gram.show() return stream.getvalue() # }}} # draw_date {{{ def draw_date(config, geo_data): """ """ tzinfo = pytz.timezone(geo_data["timezone"]) locale = config.get("locale", "en_US") datetime_day_start = datetime.datetime.utcnow() answer = "" for day in range(3): datetime_ = datetime_day_start + datetime.timedelta(hours=24*day) date = format_datetime(datetime_, "EEE dd MMM", locale=locale, tzinfo=tzinfo) spaces = ((24-len(date))/2)*" " date = spaces + date + spaces date = " "*(24-len(date)) + date answer += date answer += "\n" for _ in range(3): answer += " "*23 + u"╷" return answer[:-1] + " " # }}} # draw_time {{{ def draw_time(geo_data): """ """ tzinfo = pytz.timezone(geo_data["timezone"]) line = ["", ""] for _ in range(3): part = u"─"*5 + u"┴" + u"─"*5 line[0] += part + u"┼" + part + u"╂" line[0] += "\n" for _ in range(3): line[1] += " 6 12 18 " line[1] += "\n" # highlight current time hour_number = \ (datetime.datetime.now(tzinfo) - datetime.datetime.now(tzinfo).replace(hour=0, minute=0, second=0, microsecond=0) ).seconds//3600 for line_number, _ in enumerate(line): line[line_number] = \ line[line_number][:hour_number] \ + colorize(line[line_number][hour_number], "46") \ + line[line_number][hour_number+1:] return "".join(line) # }}} # draw_astronomical {{{ def draw_astronomical(city_name, geo_data): datetime_day_start = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) a = Astral() a.solar_depression = 'civil' city = Location() city.latitude = geo_data["latitude"] city.longitude = geo_data["longitude"] city.timezone = geo_data["timezone"] answer = "" moon_line = "" for time_interval in range(72): current_date = ( datetime_day_start + datetime.timedelta(hours=1*time_interval)).replace(tzinfo=pytz.timezone(geo_data["timezone"])) sun = city.sun(date=current_date, local=False) dawn = sun['dawn'] # .replace(tzinfo=None) dusk = sun['dusk'] # .replace(tzinfo=None) sunrise = sun['sunrise'] # .replace(tzinfo=None) sunset = sun['sunset'] # .replace(tzinfo=None) if current_date < dawn: char = " " elif current_date > dusk: char = " " elif dawn < current_date and current_date < sunrise: char = u"─" elif sunset < current_date and current_date < dusk: char = u"─" elif sunrise < current_date and current_date < sunset: char = u"━" answer += char # moon if time_interval % 3 == 0: moon_phase = city.moon_phase( date=datetime_day_start + datetime.timedelta(hours=time_interval)) moon_phase_emoji = constants.MOON_PHASES[int(math.floor(moon_phase*1.0/28.0*8+0.5)) % len(constants.MOON_PHASES)] if time_interval in [0, 24, 48, 69]: moon_line += moon_phase_emoji + " " else: moon_line += " " answer = moon_line + "\n" + answer + "\n" answer += "\n" return answer # }}} # draw_emoji {{{ def draw_emoji(data): answer = "" for i in data: emoji = constants.WEATHER_SYMBOL.get( constants.WWO_CODE.get( str(int(i)), "Unknown")) space = " "*(3-constants.WEATHER_SYMBOL_WIDTH_VTE.get(emoji)) answer += emoji + space answer += "\n" return answer # }}} # draw_wind {{{ def draw_wind(data, color_data): def _color_code_for_wind_speed(wind_speed): color_codes = [ (3, 241), # 82 (6, 242), # 118 (9, 243), # 154 (12, 246), # 190 (15, 250), # 226 (19, 253), # 220 (23, 214), (27, 208), (31, 202), (-1, 196) ] for this_wind_speed, this_color_code in color_codes: if wind_speed <= this_wind_speed: return this_color_code return color_codes[-1][1] answer = "" answer_line2 = "" for j, degree in enumerate(data): degree = int(degree) if degree: wind_direction = constants.WIND_DIRECTION[((degree+22)%360)/45] else: wind_direction = "" color_code = "38;5;%s" % _color_code_for_wind_speed(int(color_data[j])) answer += " %s " % colorize(wind_direction, color_code) # wind_speed wind_speed = int(color_data[j]) wind_speed_str = colorize(str(wind_speed), color_code) if wind_speed < 10: wind_speed_str = " " + wind_speed_str + " " elif wind_speed < 100: wind_speed_str = " " + wind_speed_str answer_line2 += wind_speed_str answer += "\n" answer += answer_line2 + "\n" return answer # }}} # panel implementation {{{ def add_frame(output, width, config): """ Add frame arond `output` that has width `width` """ empty_line = " "*width output = "\n".join(u"│"+(x or empty_line)+u"│" for x in output.splitlines()) + "\n" weather_report = \ translations.CAPTION[config["lang"]] \ + " " \ + (config["override_location"] or config["location"]) caption = u"┤ " + " " + weather_report + " " + u" ├" output = u"┌" + caption + u"─"*(width-len(caption)) + u"┐\n" \ + output + \ u"└" + u"─"*width + u"┘\n" return output def generate_panel(data_parsed, geo_data, config): """ """ max_width = 72 precip_mm_query = "[.data.weather[] | .hourly[]] | .[].precipMM" precip_chance_query = "[.data.weather[] | .hourly[]] | .[].chanceofrain" feels_like_query = "[.data.weather[] | .hourly[]] | .[].FeelsLikeC" weather_code_query = "[.data.weather[] | .hourly[]] | .[].weatherCode" wind_direction_query = "[.data.weather[] | .hourly[]] | .[].winddirDegree" wind_speed_query = "[.data.weather[] | .hourly[]] | .[].windspeedKmph" output = "" output += "\n\n" output += draw_date(config, geo_data) output += "\n" output += "\n" output += "\n" data = jq_query(feels_like_query, data_parsed) data_interpolated = interpolate_data(data, max_width) output += draw_diagram(data_interpolated, 10, max_width) output += "\n" output += draw_time(geo_data) data = jq_query(precip_mm_query, data_parsed) color_data = jq_query(precip_chance_query, data_parsed) data_interpolated = interpolate_data(data, max_width) color_data_interpolated = interpolate_data(color_data, max_width) output += draw_spark(data_interpolated, 5, max_width, color_data_interpolated) output += "\n" data = jq_query(weather_code_query, data_parsed) output += draw_emoji(data) data = jq_query(wind_direction_query, data_parsed) color_data = jq_query(wind_speed_query, data_parsed) output += draw_wind(data, color_data) output += "\n" output += draw_astronomical(config["location"], geo_data) output += "\n" output = add_frame(output, max_width, config) return output # }}} # textual information {{{ def textual_information(data_parsed, geo_data, config): """ Add textual information about current weather and astronomical conditions """ def _shorten_full_location(full_location, city_only=False): def _count_runes(string): return len(string.encode('utf-16-le')) // 2 words = full_location.split(",") output = words[0] if city_only: return output for word in words[1:]: if _count_runes(output + "," + word) > 50: return output output += "," + word return output city = Location() city.latitude = geo_data["latitude"] city.longitude = geo_data["longitude"] city.timezone = geo_data["timezone"] output = [] timezone = city.timezone datetime_day_start = datetime.datetime.now()\ .replace(hour=0, minute=0, second=0, microsecond=0) sun = city.sun(date=datetime_day_start, local=True) format_line = "%c %C, %t, %h, %w, %P" current_condition = data_parsed['data']['current_condition'][0] query = {} weather_line = wttr_line.render_line(format_line, current_condition, query) output.append('Weather: %s' % weather_line) output.append('Timezone: %s' % timezone) tmp_output = [] tmp_output.append(' Now: %%{{NOW(%s)}}' % timezone) tmp_output.append('Dawn: %s' % str(sun['dawn'].strftime("%H:%M:%S"))) tmp_output.append('Sunrise: %s' % str(sun['sunrise'].strftime("%H:%M:%S"))) tmp_output.append(' Zenith: %s' % str(sun['noon'].strftime("%H:%M:%S "))) tmp_output.append('Sunset: %s' % str(sun['sunset'].strftime("%H:%M:%S"))) tmp_output.append('Dusk: %s' % str(sun['dusk'].strftime("%H:%M:%S"))) tmp_output = [ re.sub("^([A-Za-z]*:)", lambda m: colorize(m.group(1), "2"), x) for x in tmp_output] output.append( "%20s" % tmp_output[0] \ + " | %20s " % tmp_output[1] \ + " | %20s" % tmp_output[2]) output.append( "%20s" % tmp_output[3] \ + " | %20s " % tmp_output[4] \ + " | %20s" % tmp_output[5]) city_only = False suffix = "" if "Simferopol" in timezone: city_only = True suffix = ", Крым" if config["full_address"]: output.append('Location: %s%s [%5.4f,%5.4f]' \ % ( _shorten_full_location(config["full_address"], city_only=city_only), suffix, geo_data["latitude"], geo_data["longitude"], )) output = [ re.sub("^( *[A-Za-z]*:)", lambda m: colorize(m.group(1), "2"), re.sub("^( +[A-Za-z]*:)", lambda m: colorize(m.group(1), "2"), re.sub(r"(\|)", lambda m: colorize(m.group(1), "2"), x))) for x in output] return "".join("%s\n" % x for x in output) # }}} # get_geodata {{{ def get_geodata(location): text = requests.get("http://localhost:8004/%s" % location).text return json.loads(text) # }}} def main(location, override_location=None, data=None, full_address=None, view=None): config = { "lang": "en", "locale": "en_US", "location": location, "override_location": override_location, "full_address": full_address, "view": view, } geo_data = get_geodata(location) if data is None: data_parsed = get_data(config) else: data_parsed = data output = generate_panel(data_parsed, geo_data, config) output += textual_information(data_parsed, geo_data, config) return output if __name__ == '__main__': sys.stdout.write(main(sys.argv[1]))