Article épinglé

mardi 25 juin 2019

Python, démocratie, RIP et aéroport de Paris...

Vous allez rire... le script existe en python
https://www.adprip.fr/
Vous êtes curieux, lisez la suite... Et en passant vous pouvez aussi participer ;))) vous verrez que ce n'est pas simple... ni de voter, ni d'attendre les 4 717 396 soutiens :-(

#! / usr / bin / env python
# - * - codage: utf-8 - * -

'' '
    Ce programme est un logiciel libre: vous pouvez le redistribuer et / ou modifier
    sous les termes de la licence publique générale GNU telle que publiée par
    la Free Software Foundation, soit la version 3 de la licence, soit
    (à votre choix) toute version ultérieure.

    Ce programme est distribué dans l’espoir qu’il sera utile,
    mais SANS AUCUNE GARANTIE; sans même la garantie implicite de
    QUALITÉ MARCHANDE ou APTITUDE À UN USAGE PARTICULIER. Voir le
    Licence publique générale GNU pour plus de détails.

    Vous devriez avoir reçu une copie de la licence publique générale GNU.
    avec ce programme. Sinon, voir <https://www.gnu.org/licenses/>.
'' '

depuis les collections import defaultdict

système d'importation
importation os
importer gzip
chaîne d'importation
temps d'importation
importer un fichier

demandes d'importation
importer json
importer en base64

# dossier contenant des données publiques (site web html)
WEB_ROOT = "./"

# dossier contenant des données privées (clé anticaptcha, cookies)
KEYS_ROOT = "./../"

'' 'UPDATE 17/06 23:00 -> captchas ajoutes: sur un besoin substitut anti captcha.
    https://anti-captcha.com
    ANTICAPTCHA_KEY
'' '
ANTICAPTCHA_KEY_FILE = KEYS_ROOT + "key.dat"
ANTICAPTCHA_KEY = ""
avec open (ANTICAPTCHA_KEY_FILE) en tant que ac_file:
    ANTICAPTCHA_KEY = ac_file.readlines () [0] .rstrip ("\ r \ n")

'' 'UPDATE 17/06 20:00 -> google captcha ajoute (je crois le gvt anti-GAFAM? Mais que se passe-t-il?)
    une fois le captcha valide, il ajoute des cookies, les charger ici (j'en ai 5 apparement, peut
    etre certains sont inutiles)

    Stockez le cookie à l'emplacement COOKIES_FILE avec comme format:

        incap_ses_XXX_XXXX..XX: cookie_value
        nlbi_XXXX..XX: cookie_value
        visid_incap_XXXX..XX: cookie_value
        A10_Insert-XXXXX: cookie_value
        PHPSESSID: cookie_value

    je ne mets pas mon cookie dans ce script open-source pour proteger ma vie privée. Utilisez votre propre cookies.

    NOTE: cookie? et la RGPD dans tout ca? L'europe devrait mettre une modification de ce site du gouvernement ... moins qu'il soit au dessus de ca.
'' '
COOKIES_FILE = KEYS_ROOT + "cookies.dat"
COOKIES = {}
avec open (COOKIES_FILE) comme cookie:
    cookies = cookie.readlines ()
    pour c dans les cookies:
        data = c.split (":")
        COOKIES [data [0]] = data [1] .rstrip ("\ r \ n")



def display_html_num (num):
    '' 'Format un chiffre avec' l'espace mince 'entre milliers / millions / ...' ''
    retourne '{:,}'. format (num) .replace (',', "& thinsp;")

def trim_eol (data):
    return data.replace ("\ r", ""). replace ("\ n", "")

def clean_data (ville):
    '' 'Nettoie une ville du HTML au format CSV' ''
    return ville.replace ("& # 039;", "'")

def get_ville_key (VILLES, ville):
    '' 'Pour un dictionnaire de la localité VILLES, trouve une clé possible correspondant à une ville' ''
    si len (ville) == 0:
        retour "autres" #TODO: gerer francais de l'etranger

    si ville dans VILLES: #si c'est une clé, retournez la clé
        retour ville

    pour k, v dans VILLES.iteritems (): #sinon la ville est dans la clé (match partiel qui n'a pas fait la correspondance complète), retourner la clé entière
        si ville en k:
            retourne k

    retour "autres" #sinon echec de trouver la ville

def parse_data (VILLES, data):
    '' 'Lire le contenu de données et MAJ VILLES' ''
    data = data.split ("<td width = \" 33% \ ">") [1:]
    pour i, d en énumérer (données):
        si je% 2 == 1:
            ville = clean_data (d [: d.find ("</ td>")])
            ville_key = get_ville_key (VILLES, ville)
            VILLES [ville_key] [2] + = 1


    retour VILLES


def department_reduce (localites):
    '' 'Prends en argument un dictionnaire local ~> nombre de votants et retourne un dictionnaire numero du département ~> nombre de votants' ''
    out = {}
    pour v dans localites.values ​​():
        si v [0] == "00000": #ignorer villes inconnues
            continuer

        sinon, isDigit (v [0] [0]):
            continuer

        CODE = v [0] [0: 2]
        si v [0] .startswith ("971"): #guadeloupe
            CODE = "GP"
        elif v [0] .startswith ("972"): #martinique
            CODE = "MQ"
        elif v [0] .startswith ("973"): #guyanne
            CODE = "GF"
        elif v [0] .startswith ("974"): #union
            CODE = "RE"
        elif v [0] .startswith ("976"): #mayotte
            CODE = "YT"

        si CODE est sorti:
            out [CODE] + = v [2]
        autre:
            out [CODE] = v [2]

    sortir


def url_dll (url):
    résultat = ""
    tandis que len (résultat) == 0:
        try: #URL bug par moment: OpenSSL.SSL.ZeroReturnError
            rep = requests.get (url, cookies = COOKIES)
            while len (rep.content) == 0:
                temps.sommeil (0.5)
                rep = requests.get (url)

            résultat = rep.content
        sauf exception comme e:
            print ("Exception pour url '" + url + "':" + str (e))
            heure (1)

    résultat retourné

def extract_data_between (data, start, end):
    data = data [data.find (début) + len (début):]
    data = data [: data.find (fin)]
    renvoyer des données

def check_captcha_task (taskId, sleepTime):
    '' 'Recupere une tache de reconnaissance de captcha' ''
    resultat = requests.post ("https://api.anti-captcha.com/getTaskResult", json = {"clientKey": ANTICAPTCHA_KEY, "taskId": taskId})
    essayer:
        jresult = json.loads (result.content)
        while jresult ["status"]! = "ready":
            si jresult ["status"] == "processing":
                time.sleep (sleepTime)
                resultat = requests.post ("https://api.anti-captcha.com/getTaskResult", json = {"clientKey": ANTICAPTCHA_KEY, "taskId": taskId})
                jresult = json.loads (result.content)
            autre:
                levée Exception ("erreur lors du traitement de la microtache:" + result.content)
    sauf exception comme e:
        levée Exception ("erreur lors du traitement de la microtache:" + result.content + "(exception:" + str (e) + ")")

    retourne

def make_captcha_task (task_data):
    return requests.post ("http://api.anti-captcha.com/createTask", json = {"clientKey": ANTICAPTCHA_KEY, "task": task_data})

def get_captcha (image):
    '' 'Lance un tache de reconnaissance de captcha' ''
    ret = ""
    erreur_count = 0
    tandis que len (ret) == 0:
        essayer:
            task = make_captcha_task ({"type": "ImageToTextTask", "body": "\" "+ image +" \ "", "case": True})
            jtask = json.loads (task.content)
            si jtask ["errorId"]! = 0:
                levée Exception ("n'a pas pu exécuter microtask:" + task.content)

            ret = check_captcha_task (jtask ["taskId"], 1) ["solution"] ["text"]
        sauf exception comme e:
            heure (1)
            si erreur_count> 5:
                print ("Le captcha a echoue plus de 10 fois, abandonne ...")
                sys.exit (-1)

            erreur_count + = 1
            print ("Erreur en capturant le captcha:" + str (e) + ", nouvelle tentative ...")

    retour ret

def isDigit (c):
    retourne c> = '0' et c <= '9'

def get_num_pages (data):
    '' 'Extrait le nombre de page (pagination) pour un élément IJ donne' ''
    idx = 0
    nums = []
    tant que idx! = -1:
        idx = data.find ("page =", idx + 1)
        numIdxStart = idx + len ("page =")
        numIdx = numIdxStart
        while isDigit (data [numIdx]):
            numIdx + = 1

        num = data [numIdxStart: numIdx]
        si len (num)! = 0:
            nums.append (int (num))

    ret = list (dict.fromkeys (nums)) #enleve les doublons

    si len (ret) == 0:
        retour ret

    ret.sort ()
    si ret [len (ret) -1]> = 10:
        plage de retour (2, ret [len (ret) -1] +1)

    retour ret

def bypass_captcha (url, idx, I, J):
    '' 'Dll IJ page idx en bypassant le captcha' ''
    current_url = url + "/" + I + "/" + I + J
    si idx! = 1:
        current_url + = "? page =" + str (idx)

    data = trim_eol (url_dll (current_url))
    si "Demande infructueuse. ID d'incident Incapsula" dans les données ou "https://www.google.com/recaptcha/api2/bframe" dans les données: #need pour générer un nouvel ensemble de cookies
        imprimer (COOKIES)
        raise Exception ("Arrêt du cookie Incapsula")

    '' '#essaye de MAJ automatiquement les cookies mais ça ne semble pas possible? si quelqu'un a une idee ....
        iframe = extract_data_between (données, "iframe src = \" "," \ "frameborder = 0")
        website_url = "https://www.referendum.interieur.gouv.fr/"+iframe
        iframe_data = trim_eol (url_dll (website_url))
        data_sitekey = extract_data_between (iframe_data, "data-sitekey = \" "," \ "data-callback = \" ")
        task = make_captcha_task ({"type": "NoCaptchaTaskProxyless", "websiteKey": data_sitekey, "websiteURL": website_url.replace ("/", "\\ /")})
        print (task.content)

        jtask = json.loads (task.content)
        si jtask ["errorId"]! = 0:
            levée Exception ("n'a pas pu exécuter microtask:" + task.content)

        print (check_captcha_task (jtask ["taskId"]))

        print ("Besoin d'actualiser les cookies ...")
        os.Exit (1)
    '' '

    orig_data = data
    erreur_count = 0
    tandis que "Captcha" dans les données et "Saisir les caract" dans les données:
        essayer:
            token = extract_data_between (data, "name = \" form [_token] \ "class = \" form-control \ "value = \" "," Retour ")
            token = extract_data_between (jeton, "", "\" /> ")
            print ("\ tGot captcha token =" + token)
            image = requests.get ("https://www.referendum.interieur.gouv.fr/bundles/ripconsultation/securimage/securimage_show.php", cookies = COOKIES, stream = True)
            CAPTCHA_VAL = get_captcha (base64.b64encode (image.content))
            print ("\ t" + str (idx) + "," + I + "," + J + "," + "CAPTCHA =" + CAPTCHA_VAL)

            rep = requests.post (current_url, cookies = COOKIES, data = {"form [captcha]": CAPTCHA_VAL, "form [_token]": jeton})
            data = trim_eol (rep.content)
        sauf exception comme e:
            print ("Erreur lors de la validation / récupération du captcha:" + str (e) + ", nouvelle tentative après 1s ...")
            data = orig_data
            heure (1)

            erreur_count + = 1
            si erreur_count> 5:
                print ("Le captcha a echoue plus de 10 fois, abandonne ...")
                sys.exit (-1)



    renvoyer des données

def load_insee_data (fpath):
    '' 'Charge les données de l'insee du fichier fpath, retourne un dictionnaire: nom ville ~> [code insee, nombre electeurs, 0]' ''
    VILLES = {} #Liste des villes avec le nombre total d'inscriptions sur le site de l'INSEE + arrondissements Paris / Lyon / Marseille (Premier tour 2017)
    VILLES ["autres"] = ["00000", 1,0]
    avec open (fpath, "r") comme villes:
        pour ville in villes:
            data = ville.split (";")
            si len (données)! = 3:
                print (ville, data)
                levée Exception ("insee CSV invalide")

            nb_inscrit = data [2] .rstrip ("\ n")
            si nb_inscrit == "N / A":
                continuer

            VILLES [data [1]] = [data [0], int (data [2] .rstrip ("\ n")), 0]

    retour VILLES

def full_check_data (url):
    '' Verifie les MAJ sur URL et retourne une hashmap: localite ~> nombre de votants '' '
    VILLES = load_insee_data (WEB_ROOT + "villes_insee.dat")
    pour I dans "A": # string.ascii_uppercase:
        pour J dans "AB": # string.ascii_uppercase:
            print ("Récupération" + I + J + "...")
            data = bypass_captcha (url, 1, I, J)

            si "Aucun soutien n'est accepté" dans les données:
                continuer

            parse_data (VILLES, data) #parse des données actuelles

            pages = get_num_pages (data) #get autres pages
            pour p dans les pages:
                print ("Récupération" + I + J + "." + str (p) + "...")
                data = bypass_captcha (url, p, I, J)
                parse_data (VILLES, data)

    retour VILLES

def archive_write (dest, localites):
    '' 'Sauvegarde le dictionnaire local au format csv dans le fichier dest' ''
    nombre_autres = localites ["autres"] [2]
    pour ville, v dans localites.items ():
        code_insee = v [0]
        total_inscrit = v [1]
        soutien = v [2]
        si code_insee == "00000": #ignorer le code autres
            continuer

        si soutien> total_inscrit:
            print ("ATTENTION: plus de soutiens d'inscrits pour:", ville, v, "probablement get_ville_key a corriger; deplace cette ville dans autres 00000")
            nombre_autres + = soutien
            continuer

        dest.write (code_insee + ";" + ville + ";" + str (total_inscrit) + ";" + str (soutien) + "\ n")

    #concatene autres; ici total_inscrit = soutien de total_inscrit est faux (insee, ville fictive)
    dest.write ("00000; autres;" + str (localites ["autres"] [2]) + ";" + str (localites ["autres"] [2]) + "\ n")

def generate_result (count):
    '' 'Prends une hashmap localite in entree, retourne {nombre de votants, la chaîne à remplacer dans index.html}' ''
    requis = 4717396

    out = display_html_num (count) + "& nbsp; / & nbsp;" + display_html_num (requis) + "("
    pourcentage = 100 * float (count) / float (requis)
    si compte == 0:
        out + = "0"
    nombre elif <= requis / 100: #moins de un pourcent
        out + = "{: 1.5f}". format (pourcentage)
    autre:
        out + = "{: 2.2f}". format (pourcentage)
    out + = "%)"

    sortir

def quick_check_data (url, last_count):
    page = 2
    avec open (last_count, "r") comme lpage: #check le décompte de la dernière page
        page = int (lpage.readlines () [0] .rstrip ("\ r \ n"))

    print ("Obtenir la page" + str (page) + "...")
    data = bypass_captcha (url, page, "*", "")
    imprimer (données)
    pages = get_num_pages (données)
    imprimer (pages)
    if pages [len (pages) -1]> page: #update nombre de pages si nécessaire
        page = pages [len (pages) -1]
        print ("Nouvelle dernière page trouvée" + str (page) + ", MAJing ...")
        si os.path.exists (last_count):
            os.remove (last_count)
        avec open (last_count, "w") comme lpage:
            lpage.write (str (page))

        data = bypass_captcha (url, page, "*", "")

    data = data.split ("<td width = \" 33% \ ">") [1:]
    retourne 200 * (page-1) + len (data) / 2

def update_index (OUT):
    cpath = WEB_ROOT + "count.html"
    si os.path.exists (cpath):
        os.remove (cpath)
    avec open (cpath, "a") comme compteur:
        counter.write (OUT + "<a href=\"https://www.adprip.fr/\"> <img src = \" https://upload.wikimedia.org/wikipedia/commons/6/64/Icon_External_Link .png \ "> </a>")

    pour la ligne dans fileinput.input (WEB_ROOT + "index.html", inplace = True):
       si line.startswith ("<! - CRON ->"):
           print ("<! - CRON ->" + OUT)
       autre:
           print (line.rstrip ('\ r \ n'))

def write_history (count):
    TS = str (int (time.time ()))
    avec open (WEB_ROOT + "history.dat", "a") comme historique:
        history.write (TS + ";" + str (count) + "\ n")

    retourne TS

def update_data (localites):
    '' 'MAJ les données sur le serveur en fonction de hashmap localite' ''
    COUNT = 0
    pour v dans localites.values ​​():
        COUNT + = v [2]
    OUT = generate_result (COUNT)

    print ("Index mis à jour vers:" + OUT)
    update_index (OUT)


    si COUNT! = 0:
        TS = write_history (COUNT)
        avec gzip.open (WEB_ROOT + "archive /" + TS + ". dat.gz", "w") comme archive:
            archive_write (archive, localites)

        # données géographiques les plus récentes
        latest = WEB_ROOT + "archive / latest"
        si os.path.exists (dernière):
            os.remove (last)
        avec ouvert (dernier, "w") comme ldata:
            archive_write (ldata, localites)

        #js map
        map_path = WEB_ROOT + "js / vote_data.js"
        si os.path.exists (map_path):
            os.remove (map_path)
        avec open (map_path, "w") comme jsmap:
            jsmap.write ("var vote = {\ n")
            pour k, v dans department_reduce (localites) .items ():
                jsmap.write ("\ t \" FR - "+ k +" \ ":" + str (v) + ", \ n")
            jsmap.write ("}; \ n")

si __name__ == "__main__":
    si len (sys.argv) == 2 et sys.argv [1] == "complet":
        LOCALITES = full_check_data ("https://www.referendum.interieur.gouv.fr/consultation_publique/8")
        update_data (LOCALITES)
    autre:
        count = quick_check_data ("https://www.referendum.interieur.gouv.fr/consultation_publique/8", WEB_ROOT + "last_count")
        OUT = generate_result (count)
        print ("Index mis à jour vers:" + OUT)
        update_index (OUT)
        write_history (count)

Aucun commentaire:

Enregistrer un commentaire

Tout commentaire nous engage ;)