import json import os, os.path import requests import characters_ssbu # ============================================================================= API_HOST = "api.smash.gg" API_ENDPOINT = "gql/alpha" API_SCHEME = "https" API_URL = "{scheme}://{host}/{endpoint}".format( scheme = API_SCHEME, host = API_HOST, endpoint = API_ENDPOINT, ) CHARACTERS = { c.smashggid : c.name for c in characters_ssbu.EVERYONE } PLAYERSKINS = {} # ============================================================================= # ----------------------------------------------------------------------------- class Player(): """A Player, as registered by the smash.gg API, and their characters choices and placement in a tournament""" # ------------------------------------------------------------------------- def __init__( self, id, prefix, gamerTag, placement, seeding, twitterHandle = None, chars = None ): self.id = int(id) # actually intended to store an Entrant id self.prefix = ( "" if prefix is None else prefix ) self.gamerTag = gamerTag self.placement = int(placement) self.seeding = int(seeding) self.twitterHandle = ( "" if twitterHandle is None else twitterHandle ) if chars is None: self.chars = {} else: self.chars = chars # ------------------------------------------------------------------------- def add_character_selection(self, character, win): if type(character) != tuple: try: charname = CHARACTERS[character] except KeyError: charname = character # Unknown char -> sgg id try: skin = PLAYERSKINS[self.gamerTag.lower()][charname] except KeyError: skin = "00" # default skin character = ( charname, skin ) try: self.chars[character] += ( 1.01 if win else 1.00 ) # This 1.01 / 1.00 tricks should hold until the player loses 101 # matches with one character and wins 100 matches with another... # during one tournament. except KeyError: self.chars[character] = ( 1.01 if win else 1.00 ) # ------------------------------------------------------------------------- def get_mains(self): return [ cv[0] for cv in sorted( self.chars.items(), key = lambda cv: cv[1], reverse = True, ) ] # ------------------------------------------------------------------------- def conf(self): # The char list looks like 'character1_skin1 (12.08), character2_skin2 # (3.02)' where the number between parenthesis is the number of time # the character was played charslist = ", ".join( [ "{}_{} ({:.2f})".format(c,s,n) for (c,s),n in sorted( self.chars.items(), key = lambda cv: cv[1], reverse = True,) ]) return """ [player {tag}] tag: {tag} team: {pfx} seeding: {seed} placement: {plc} twitter: {twi} characters: {charslist}""" \ .format( tag = self.gamerTag, pfx = self.prefix, seed = self.seeding, plc = self.placement, twi = self.twitterHandle, charslist = charslist, ) # ----------------------------------------------------------------------------- class Tournament(): """A Tournament, as registered by the smash.gg API""" # ------------------------------------------------------------------------- def __init__( self, id, name, slug, startAt, numEntrants, venueAddress = None, venueName = None, city = None, countryCode = None, hashtag = None, ): self.id = id self.name = name self.slug = slug.split("/")[-1] self.startAt = startAt self.numEntrants = numEntrants self.venueAddress = venueAddress self.venueName = venueName self.city = city self.countryCode = countryCode self.hashtag = hashtag # ------------------------------------------------------------------------- def conf(self): return """ [Tournament] name: {name} date: {date} location: {loc} - {addr} ({city}, {ctry}) slug: {slug} numEntrants: {nbe}""" \ .format( id = self.id, name = self.name, date = self.startAt, loc = self.venueName, addr = self.venueAddress, city = self.city, ctry = self.countryCode, slug = self.slug, nbe = self.numEntrants, ) # ------------------------------------------------------------------------- def clean_name(self, name_seo_delimiter): if name_seo_delimiter is not None: self.name = self.name.split(name_seo_delimiter)[0].strip() return self # ============================================================================= def run_query( query_name, variables = {}, token = "", proxy = None, query_dir = "queries", query_extension = "gql", api_url = API_URL, log = None, ): # Load query query_path = os.path.join( query_dir, "{}.{}".format( query_name, query_extension, ) ) query = "" with open(query_path, 'r') as query_file: query = query_file.read() # Build payload payload = { "query": query, "variables": json.dumps(variables), } # Build headers headers = { "Authorization": "Bearer {token}".format(token = token), "Accept": "application/json", "Content-Type": "application/json" } # Send the query try: log.debug("Sending query '{}' with variables:".format(query_name)) log.debug(json.dumps(variables)) except AttributeError: pass rv = requests.post( API_URL, json.dumps(payload).encode("utf-8"), headers = headers, proxies = {"http": proxy, "https": proxy}, ) try: rv_json = rv.json() except Exception as e: log.error("HTTP request failed") log.error(e) log.debug(e, exc_info=True) log.debug(rv) # Try to return the data, or log the error and return None try: log.debug("query complexity : {}" \ .format( rv_json["extensions"]["queryComplexity"], )) return rv_json["data"] except KeyError: try: log.error("GraphQL error") log.error(rv_json) except AttributeError: pass return None