MacOS : Python : Suppression des doublons d’emails avec l’API Python Elasticsearch/Kibana (Version V3)

33 x served & 10 x viewed

Finalement dans les 200.000 emails je pense avoir des doublons … je vais donc profiter de l’export vers Elastciseach/Kibana pour voir si j’ai des doublons. L’email qu’il va avoir la même taille et le même checksum MD5 sera considéré comme un doublons.

Voici donc la version V3 (sans la suppression de fichier : os.unlink(path) )

#!/usr/bin/env python3

import email
import plistlib
import hashlib
import re
import glob, os
import string
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.header import Header, decode_header, make_header
from elasticsearch import Elasticsearch 

class Emlx(object):
        def __init__(self):
            super(Emlx, self).__init__()
            self.bytecount = 0
            self.msg_data = None
            self.msg_plist = None

        def parse(self, filename_path):
            with open(filename_path, "rb") as f:
                self.bytecount = int(f.readline().strip())
                self.msg_data = email.message_from_bytes(f.read(self.bytecount))
                self.msg_plist = plistlib.loads(f.read())
            return self.msg_data, self.msg_plist

def md5(fname):
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

if __name__ == '__main__':
   msg = Emlx()
   nb_parse = 0
   nb_error = 0
   save_space = 0
   list_email = []
   printable = set(string.printable)
   path_mail = "/Users/MonLogin/Library/Mail/V6/"
   es_keys = "mail"
   es=Elasticsearch([{'host':'localhost','port':9200}])
   for root, dirs, files in os.walk(path_mail):
      for file in files:
          if file.endswith(".emlx"):
             file_full = os.path.join(root, file)
             my_check = md5(root+'/'+file)
             my_count = list_email.count(my_check)
             list_email.append(my_check)
             message, plist = msg.parse(file_full)
             statinfo = os.stat(file_full)
             if (my_count > 0):
                save_space += int(statinfo.st_size)
                #os.unlink(root+'/'+file)
             my_date = message['Date']
             my_id = message['Message-ID']
             my_server = message['Received']
             my_date_str = ""
             if my_date is not None and my_date is not Header:
                 try:
                   my_date_str = datetime.fromtimestamp(parsedate_to_datetime(my_date).timestamp()).strftime('%Y-%m-%dT%H:%M:%S')
                 except :
                   my_date_str = ""
             my_email = str(message['From'])
             my_email = str(make_header(decode_header(my_email)))
             if my_email is not None:
                 my_domain = re.search("@[\w.\-\_]+", str(my_email))
                 if my_domain is not None:
                      my_domain_str = str(my_domain.group ());
                      my_domain_str = my_domain_str.lower()
             if my_email is not None:
                 my_name = re.search("[\w.\-\_]+@", str(my_email))
                 if my_name is not None:
                      my_name_str = str(my_name.group ());
                      my_name_str = my_name_str.lower()
             json = '{"checksum":"'+my_check+'","count":"'+str(my_count)+'","size":'+str(statinfo.st_size)
             if my_domain is not None:
                 #print(my_domain.group())
                 #print(my_name.group())
                 json = json+',"name":"'+my_name_str+'","domain":"'+my_domain_str+'"'
             else:
                 my_email = my_email.replace(",","")
                 my_email = my_email.replace('"','')
                 my_email = str(re.sub(r'[^\x00-\x7f]',r'', my_email)) 
                 my_email = my_email.lower()
                 json = json+',"name":"'+my_email+'","domain":"None"';
             if my_date is not None and len(my_date_str) > 1:
                 json = json+',"date":"'+my_date_str+'","id":'+str(nb_parse)
             else:
                 json = json+',"id":'+str(nb_parse)
             if my_server is not None and my_server is not Header:
                 ip = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', str(my_server))
                 if ip is not None:
                    my_ip = ip.group()
                    json = json+',"ip":"'+str(my_ip)+'"'
                 else:
                    my_ip = ""
                 #ip = re.findall(r'\b25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\b',my_server)
                 #ip = re.findall( r'[0-9]+(?:\.[0-9]+){1,3}', my_server )
                 #ip = re.findall(r'[\d.-]+', my_server) 
             else:
                 json = json
             if my_id is not None and my_id is not Header:
                 try:
                    my_id =my_id.strip()
                    my_id =my_id.strip('\n')
                    json = json+',"Message-ID":"'+my_id+'","file":"'+file+'"}'
                 except:
                    json = json+',"file":"'+file+'"}'
             else:
                 json = json+',"file":"'+file+'"}'
             print(json)
             try:
                res = es.index(index=es_keys,doc_type='emlx',id=nb_parse,body=json)
             except:
                nb_error += 1   
             nb_parse += 1
             #print(plist)
   print(nb_parse)

A suivre pour la V4 !

MacOS : Python : Découverte de l’API Python Elasticsearch/Kibana avec SVN pour des stats

19 x served & 5 x viewed

Petit script pour envoyer l’historique d’un SVN vers Elasticsearch/Kibana. Avant j’utilisais statSVN : https://statsvn.org .

Pour l’installation sous Mac :

$ pip2 install --upgrade pip
$ pip2 install elasticsearch

Voici le programme :

import xml.etree.ElementTree as ET
import os
import re
from elasticsearch import Elasticsearch 
import sys

tree = ET.parse('svn.log')
root = tree.getroot()

count = 0;
nb_error = 0
es=Elasticsearch([{'host':'localhost','port':9200}])
es_keys="svn"

for logentry in root.iter('logentry'):
   revision = logentry.get('revision')
   author = logentry.find('author').text
   date = logentry.find('date').text
   msg = logentry.find('msg').text
   if msg is not None:
      msg = msg.replace("\n", " ")
      msg = msg.replace("\r", " ")
      msg = msg.rstrip('\r\n')
      msg = msg.strip('\r\n')
      msg = str(re.sub(r'[^\x00-\x7F]',' ', msg))
   paths = logentry.find('paths') 
   for path in paths.findall('path'):
      my_path = path.text
      my_basename = os.path.basename(my_path)
      my_dir = os.path.dirname(my_path)
      count += 1
      if msg is not None:
         json = '{"revision":'+revision+',"author":"'+author+'","date":"'+date+'","msg":"'+msg+'","basename":"'+my_basename+'","folder":"'+my_dir+'"}'
      else:
         json = '{"revision":'+revision+',"author":"'+author+'","date":"'+date+'","basename":"'+my_basename+'","folder":"'+my_dir+'"}'
      print(count,json)
      try:
        res = es.index(index=es_keys,doc_type='svn',id=count,body=json)
      except:
        nb_error += 1  

Il faut faire un export XML de SVN :

$ svn log -v --xml  > svn.log

Pour faire le check : http://127.0.0.1:9200/svn/_mappings , la réponse est du type :

{"svn":{"mappings":{"svn":{"properties":{"author":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"basename":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"date":{"type":"date"},"folder":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"msg":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"revision":{"type":"long"}}}}}}

MacOS : Python : Découverte de l’API Python Elasticsearch/Kibana (Version V2)

32 x served & 8 x viewed

J’ai finalement fait une version V2 qui corrige quelques problèmes (Exception de quelques emails our quelques noms de domaines). Il faut mettre son MonLogin .

#!/usr/bin/env python3

import email
import plistlib
import re
import glob, os
import string
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.header import Header, decode_header, make_header
from elasticsearch import Elasticsearch 

class Emlx(object):
        def __init__(self):
            super(Emlx, self).__init__()
            self.bytecount = 0
            self.msg_data = None
            self.msg_plist = None

        def parse(self, filename_path):
            with open(filename_path, "rb") as f:
                self.bytecount = int(f.readline().strip())
                self.msg_data = email.message_from_bytes(f.read(self.bytecount))
                self.msg_plist = plistlib.loads(f.read())
            return self.msg_data, self.msg_plist

if __name__ == '__main__':
   msg = Emlx()
   nb_parse = 0
   nb_error = 0
   printable = set(string.printable)
   path_mail = "/Users/MonLogin/Library/Mail/V6/"
   es_keys = "mail"
   es=Elasticsearch([{'host':'localhost','port':9200}])
   for root, dirs, files in os.walk(path_mail):
      for file in files:
          if file.endswith(".emlx"):
             file_full = os.path.join(root, file)
             message, plist = msg.parse(file_full)
             statinfo = os.stat(file_full)
             my_date = message['Date']
             my_id = message['Message-ID']
             my_server = message['Received']
             my_date_str = ""
             if my_date is not None and my_date is not Header:
                 try:
                   my_date_str = datetime.fromtimestamp(parsedate_to_datetime(my_date).timestamp()).strftime('%Y-%m-%dT%H:%M:%S')
                 except :
                   my_date_str = ""
             my_email = str(message['From'])
             if my_email is not None:
                 my_domain = re.search("@[\w.\-\_]+", str(my_email))
                 if my_domain is not None:
                      my_domain_str = str(my_domain.group ());
                      my_domain_str = my_domain_str.lower()
             if my_email is not None:
                 my_name = re.search("[\w.\-\_]+@", str(my_email))
                 if my_name is not None:
                      my_name_str = str(my_name.group ());
                      my_name_str = my_name_str.lower()
             if my_domain is not None:
                 #print(my_domain.group())
                 #print(my_name.group())
                 json = '{"name":"'+my_name_str+'","domain":"'+my_domain_str+'"'
             else:
                 my_email = my_email.replace(",","")
                 my_email = my_email.replace('"','')
                 my_email = str(re.sub(r'[^\x00-\x7f]',r'', my_email)) 
                 my_email = my_email.lower()
                 json = '{"name":"'+my_email+'","domain":"None"';
             if my_date is not None and len(my_date_str) > 1:
                 json = json+',"date":"'+my_date_str+'","size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             else:
                 json = json+',"size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             if my_server is not None and my_server is not Header:
                 ip = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', str(my_server))
                 if ip is not None:
                    my_ip = ip.group()
                    json = json+',"ip":"'+str(my_ip)+'"'
                 else:
                    my_ip = ""
                 #ip = re.findall(r'\b25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\b',my_server)
                 #ip = re.findall( r'[0-9]+(?:\.[0-9]+){1,3}', my_server )
                 #ip = re.findall(r'[\d.-]+', my_server) 
             else:
                 json = json
             if my_id is not None and my_id is not Header:
                 try:
                    my_id =my_id.strip()
                    my_id =my_id.strip('\n')
                    json = json+',"Message-ID":"'+my_id+'","file":"'+file+'"}'
                 except:
                    json = json+',"file":"'+file+'"}'
             else:
                 json = json+',"file":"'+file+'"}'
             print(json)
             try:
                res = es.index(index=es_keys,doc_type='emlx',id=nb_parse,body=json)
             except:
                nb_error += 1   
             nb_parse += 1
             #print(plist)
   print(nb_parse)

 

Et désolé si le programme n’est pas très propre et sans commentaire … le but est surtout de jouer avec Elasticsearch/Kibana !

MacOS : Python : Découverte de l’API Python Elasticsearch

24 x served & 9 x viewed

J’ai voulu faire un premier programme afin de découvrir l’API Elasticsearch, comme base d’information j’ai pris mes emails. C’est assez simple, toutes les personnes sous MacOS ont des emails …

Voici donc le petit programme en Python (pour Michel) : il suffit de changer MonUser.

#!/usr/bin/env python3

import email
import plistlib
import re
import glob, os
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.header import Header, decode_header, make_header
from elasticsearch import Elasticsearch 

class Emlx(object):
        def __init__(self):
            super(Emlx, self).__init__()
            self.bytecount = 0
            self.msg_data = None
            self.msg_plist = None

        def parse(self, filename_path):
            with open(filename_path, "rb") as f:
                self.bytecount = int(f.readline().strip())
                self.msg_data = email.message_from_bytes(f.read(self.bytecount))
                self.msg_plist = plistlib.loads(f.read())
            return self.msg_data, self.msg_plist

if __name__ == '__main__':
   msg = Emlx()
   nb_parse = 0
   path_mail = "/Users/MonUser/Library/Mail/V6/"
   es_keys = "mail"
   es=Elasticsearch([{'host':'localhost','port':9200}])
   for root, dirs, files in os.walk(path_mail):
      for file in files:
          if file.endswith(".emlx"):
             file_full = os.path.join(root, file)
             message, plist = msg.parse(file_full)
             statinfo = os.stat(file_full)
             my_date = message['Date']
             my_id = message['Message-ID']
             my_server = message['Received']
             if my_date is not None and my_date is not Header:
                 my_date_str = datetime.fromtimestamp(parsedate_to_datetime(my_date).timestamp()).strftime('%Y-%m-%dT%H:%M:%S')
             my_email = str(message['From'])
             if my_email is not None:
                 my_domain = re.search("@[\w.\-\_]+", str(my_email))
             if my_email is not None:
                 my_name = re.search("[\w.\-\_]+@", str(my_email))
             if my_domain is not None:
                 #print(my_domain.group())
                 #print(my_name.group())
                 json = '{"name":"'+my_name.group()+'","domain":"'+my_domain.group()+'"'
             else:
                 my_email = my_email.replace(",","")
                 my_email = my_email.replace('"','')
                 json = '{"name":"'+my_email+'","domain":"None"';
             if my_date is not None:
                 json = json+',"date":"'+my_date_str+'","size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             else:
                 json = json+',"size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             if my_server is not None and my_server is not Header:
                 ip = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', str(my_server))
                 if ip is not None:
                    my_ip = ip.group()
                    json = json+',"ip":"'+str(my_ip)+'"'
                 else:
                    my_ip = ""
                 #ip = re.findall(r'\b25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\b',my_server)
                 #ip = re.findall( r'[0-9]+(?:\.[0-9]+){1,3}', my_server )
                 #ip = re.findall(r'[\d.-]+', my_server) 
             else:
                 json = json
             if my_id is not None and my_id is not Header:
                 my_id =my_id.strip()
                 my_id =my_id.strip('\n')
                 json = json+',"Message-ID":"'+my_id+'","file":"'+file+'"}'
             else:
                 json = json+',"file":"'+file+'"}'
             print(json)
             res = es.index(index=es_keys,doc_type='emlx',id=nb_parse,body=json)
             nb_parse += 1
             #print(plist)
   print(nb_parse)

Le but de ce programme c’est simplement de mieux comprendre l’API.
Pour le lancer j’ai fait :

sudo python3 ParseEmail.py > email-json.txt

A noter que le Terminal doit avoir certains droits pour que cela fonctionne : http://www.cyber-neurones.org/2019/11/macos-acces-a-library-mail-via-un-terminal/ .

Ensuite pour faire un petit contrôle il suffit de faire : http://localhost:9200/mail/_mappings .

{"mail":{"mappings":{"emlx":{"properties":{"Message-ID":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"date":{"type":"date"},"domain":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"file":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"id":{"type":"long"},"ip":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"name":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"size":{"type":"long"}}}}}}

Je viens de lancer le programme … c’est très long, voici ce qu’il a pour l’instant en base (sur les 20 dernières années) :

En mode Histogramme :

OpenDataGouv : Transaction immobilière sur Biot

275 x served & 158 x viewed

En quelques étapes …

Etape 1: Voir un tweet passer de @ollybret :

Etape 2 : Vérifier l’information sur @datagouvfr : le lien étant : https://www.data.gouv.fr/fr/datasets/demandes-de-valeurs-foncieres/ 

Et se faire un profil :

Etape 3 : Voir du JSON : Micro-API DVF (Demande de Valeurs Foncières) et une API :

Voir même le résultat en fonction de la commune ou du code postal :

  • http://api.cquest.org/dvf?code_commune=94068 ( code_commune = code INSEE de la commune (ex: 94068) )
  • http://api.cquest.org/dvf?code_postal=89110 ( code_postal = code postal )

Superbe travail de Christian Quest ( sur Twitter @cq94 ‏) avec en plus la mise du code sur GitHub : https://github.com/cquest/dvf_as_api .( @github )

Etape 4 : Sortir le python ( @ThePSF ) et se faire une petite carte sur Biot :

$ python3 API_DVF_ToStaticMapColor.py 
Number total of request 1
Number total of pin 2598 Before date 624 After date 1974

Et publier les sources ( https://github.com/CYBERNEURONES/Python )  : les conditions d’utilisations : http://data.cquest.org/dgfip_dvf/conditions-generales-dutilisation.pdf.

#
# for Python 3
# 
#   ARIAS Frederic
#   Sorry ... It's difficult for me the python :)
#

from time import gmtime, strftime
import time
import json
import requests
from datetime import *
from staticmap import StaticMap, CircleMarker

m = StaticMap(1100, 1100, url_template='http://a.tile.osm.org/{z}/{x}/{y}.png')

codepostal = "06410"
date_inondation = datetime.strptime("2015-10-03",'%Y-%m-%d')
nb_request = 0
nb_plot = 0
after_inondation = 0
before_inondation = 0
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
url_notes = (
    "http://api.cquest.org/dvf?"
    "code_postal="+codepostal
)
try:
    resp = requests.get(url_notes, headers=headers)
    nb_request += 1
    resp.raise_for_status()
    resp_dict = resp.json()
    #print(resp_dict);
    for my_resource in resp_dict['resultats']:
        #print(my_resource)
        long = my_resource.get('lon')
        lat = my_resource.get('lat')
        my_date = datetime.strptime(my_resource.get('date_mutation'),'%Y-%m-%d')
        if (long) and (lat):
           nb_plot += 1
           #print("Long:",float(long),"Lat:",float(lat),"Date:",my_date);
           if my_date > date_inondation : 
             marker = CircleMarker((float(long), float(lat)), '#0036FF', 8)
             after_inondation += 1
           else :         
             marker = CircleMarker((float(long), float(lat)), '#FF3600', 8)
             before_inondation += 1
           m.add_marker(marker)
except requests.exceptions.HTTPError as e:
    print("Bad HTTP status code:", e)
except requests.exceptions.RequestException as e:
    print("Network error:", e)

image = m.render(zoom=13)
image.save('mymap_zoom13_col.png')
image = m.render(zoom=14)
image.save('mymap_zoom14_col.png')
image = m.render(zoom=15)
image.save('mymap_zoom15_col.png')

print ("Number total of request",nb_request)
print ("Number total of pin",nb_plot,"Before date",before_inondation,"After date",after_inondation)

Etape n°5 : Voir le résultat :

A la base je voulais localiser les achats du Fond Barnier, mais je n’ai rien trouvé … sniff.

J’ai aussi fait un programme ( https://github.com/CYBERNEURONES/Python/blob/master/API_DVF_ToStaticMapColor_V2.py )  pour travailler avec le code insee …. mais pas mieux :

Toutes les données

Expropriation, Adjudication.

A suivre …