MacOS : Python : Découverte de l’API Python Elasticsearch/Kibana (Version V2)

87 x served & 17 x viewed

J’ai finalement fait une version V2 qui corrige quelques problèmes (Exception de quelques emails our quelques noms de domaines). Il faut mettre son MonLogin .

#!/usr/bin/env python3

import email
import plistlib
import re
import glob, os
import string
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.header import Header, decode_header, make_header
from elasticsearch import Elasticsearch 

class Emlx(object):
        def __init__(self):
            super(Emlx, self).__init__()
            self.bytecount = 0
            self.msg_data = None
            self.msg_plist = None

        def parse(self, filename_path):
            with open(filename_path, "rb") as f:
                self.bytecount = int(f.readline().strip())
                self.msg_data = email.message_from_bytes(f.read(self.bytecount))
                self.msg_plist = plistlib.loads(f.read())
            return self.msg_data, self.msg_plist

if __name__ == '__main__':
   msg = Emlx()
   nb_parse = 0
   nb_error = 0
   printable = set(string.printable)
   path_mail = "/Users/MonLogin/Library/Mail/V6/"
   es_keys = "mail"
   es=Elasticsearch([{'host':'localhost','port':9200}])
   for root, dirs, files in os.walk(path_mail):
      for file in files:
          if file.endswith(".emlx"):
             file_full = os.path.join(root, file)
             message, plist = msg.parse(file_full)
             statinfo = os.stat(file_full)
             my_date = message['Date']
             my_id = message['Message-ID']
             my_server = message['Received']
             my_date_str = ""
             if my_date is not None and my_date is not Header:
                 try:
                   my_date_str = datetime.fromtimestamp(parsedate_to_datetime(my_date).timestamp()).strftime('%Y-%m-%dT%H:%M:%S')
                 except :
                   my_date_str = ""
             my_email = str(message['From'])
             if my_email is not None:
                 my_domain = re.search("@[\w.\-\_]+", str(my_email))
                 if my_domain is not None:
                      my_domain_str = str(my_domain.group ());
                      my_domain_str = my_domain_str.lower()
             if my_email is not None:
                 my_name = re.search("[\w.\-\_]+@", str(my_email))
                 if my_name is not None:
                      my_name_str = str(my_name.group ());
                      my_name_str = my_name_str.lower()
             if my_domain is not None:
                 #print(my_domain.group())
                 #print(my_name.group())
                 json = '{"name":"'+my_name_str+'","domain":"'+my_domain_str+'"'
             else:
                 my_email = my_email.replace(",","")
                 my_email = my_email.replace('"','')
                 my_email = str(re.sub(r'[^\x00-\x7f]',r'', my_email)) 
                 my_email = my_email.lower()
                 json = '{"name":"'+my_email+'","domain":"None"';
             if my_date is not None and len(my_date_str) > 1:
                 json = json+',"date":"'+my_date_str+'","size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             else:
                 json = json+',"size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             if my_server is not None and my_server is not Header:
                 ip = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', str(my_server))
                 if ip is not None:
                    my_ip = ip.group()
                    json = json+',"ip":"'+str(my_ip)+'"'
                 else:
                    my_ip = ""
                 #ip = re.findall(r'\b25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\b',my_server)
                 #ip = re.findall( r'[0-9]+(?:\.[0-9]+){1,3}', my_server )
                 #ip = re.findall(r'[\d.-]+', my_server) 
             else:
                 json = json
             if my_id is not None and my_id is not Header:
                 try:
                    my_id =my_id.strip()
                    my_id =my_id.strip('\n')
                    json = json+',"Message-ID":"'+my_id+'","file":"'+file+'"}'
                 except:
                    json = json+',"file":"'+file+'"}'
             else:
                 json = json+',"file":"'+file+'"}'
             print(json)
             try:
                res = es.index(index=es_keys,doc_type='emlx',id=nb_parse,body=json)
             except:
                nb_error += 1   
             nb_parse += 1
             #print(plist)
   print(nb_parse)

 

Et désolé si le programme n’est pas très propre et sans commentaire … le but est surtout de jouer avec Elasticsearch/Kibana !

1 réflexion sur « MacOS : Python : Découverte de l’API Python Elasticsearch/Kibana (Version V2) »

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Time limit is exhausted. Please reload CAPTCHA.