Python : Creation de PNG à base de CSV, avec des EXIF de localisation

J’ai fait un petit programme pour faire des PNG à l’aide d’un fichier CSV. Le but est de mettre des balises sur Google Picture.

Voici un exemple de fichier CSV :

$ cat list.csv 
Nom,Année,Mois,Jours,Lieux,Pays
Deplacement sur Paris,2018,11,01,Paris,France
Deplacement sur Londres,2011,11,01,London,UK

Voici le programme :

from PIL import Image, ImageDraw, ImageFont
import piexif
from datetime import datetime
import csv
from geopy.geocoders import Nominatim
from GPSPhoto import gpsphoto

geolocator = Nominatim(user_agent="Your_Name")

with open('list.csv') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    line_count = 0
    for row in csv_reader:
        if line_count == 0:
            line_count += 1
        else:
            name = '%s \n Le %d/%d/%d \n a %s,%s . :' % (str(row[0]),int(row[1]),int(row[2]),int(row[3]),str(row[4]),str(row[5]))
            print('\t %s ' % name)
            line_count += 1
            filename = 'image-%d.jpg' % (line_count)

            img = Image.new('RGB', (1024, 800), color = (73, 109, 137))

            d = ImageDraw.Draw(img)
            fontsize = 80 
            font = ImageFont.truetype('/usr/share/fonts/truetype/msttcorefonts/Arial.ttf', fontsize)
            d.text((10,10), name, font=font)
            img.save(filename)

            exif_dict = piexif.load(filename)
            new_date = datetime(int(row[1]), int(row[2]), int(row[3]), 0, 0, 0).strftime("%Y:%m:%d %H:%M:%S")
            exif_dict['0th'][piexif.ImageIFD.DateTime] = new_date
            exif_dict['Exif'][piexif.ExifIFD.DateTimeOriginal] = new_date
            exif_dict['Exif'][piexif.ExifIFD.DateTimeDigitized] = new_date
            exif_bytes = piexif.dump(exif_dict)
            piexif.insert(exif_bytes, filename)
            address= '%s,%s' % (row[4], row[5])
            location = geolocator.geocode(address)
            print('\t\t %f %f %d' % (location.latitude, location.longitude, location.altitude))
            photo = gpsphoto.GPSPhoto(filename)
            info = gpsphoto.GPSInfo((location.latitude, location.longitude), alt=int(location.altitude), timeStamp=new_date)
            photo.modGPSData(info, filename)

    print('Processed %d lines.' % line_count)


Pour qu’il fonctionne il faut avoir installé :

# pip install geopy
# pip install GPSPhoto
# pip install exifread

Pour l’installation des fonts sous Ubuntu :

# sudo apt-get install gsfonts gsfonts-other gsfonts-x11 ttf-mscorefonts-installer t1-xfree86-nonfree fonts-alee ttf-ancient-fonts fonts-arabeyes fonts-arphic-bkai00mp fonts-arphic-bsmi00lp fonts-arphic-gbsn00lp fonts-arphic-gkai00mp fonts-atarismall fonts-dustin fonts-f500 fonts-sil-gentium ttf-georgewilliams ttf-isabella fonts-larabie-deco fonts-larabie-straight fonts-larabie-uncommon ttf-sjfonts ttf-staypuft ttf-summersby fonts-ubuntu-title ttf-xfree86-nonfree xfonts-intl-european xfonts-jmk xfonts-terminus

Pour vérifier :

# exiftool image-2.jpg 
ExifTool Version Number         : 11.88
File Name                       : image-2.jpg
Directory                       : .
File Size                       : 36 kB
File Modification Date/Time     : 2021:02:22 14:55:29+01:00
File Access Date/Time           : 2021:02:22 14:55:30+01:00
File Inode Change Date/Time     : 2021:02:22 14:55:29+01:00
File Permissions                : rw-rw-r--
File Type                       : JPEG
File Type Extension             : jpg
MIME Type                       : image/jpeg
JFIF Version                    : 1.01
Resolution Unit                 : None
X Resolution                    : 1
Y Resolution                    : 1
Exif Byte Order                 : Big-endian (Motorola, MM)
Modify Date                     : 2018:11:01 00:00:00
Date/Time Original              : 2018:11:01 00:00:00
Create Date                     : 2018:11:01 00:00:00
GPS Latitude Ref                : North
GPS Longitude Ref               : East
GPS Altitude Ref                : Above Sea Level
GPS Time Stamp                  : 00:00:00
GPS Processing Method           : GPS
GPS Date Stamp                  : 2018:11:01
Image Width                     : 1024
Image Height                    : 800
Encoding Process                : Baseline DCT, Huffman coding
Bits Per Sample                 : 8
Color Components                : 3
Y Cb Cr Sub Sampling            : YCbCr4:2:0 (2 2)
Image Size                      : 1024x800
Megapixels                      : 0.819
GPS Altitude                    : 0 m Above Sea Level
GPS Date/Time                   : 2018:11:01 00:00:00Z
GPS Latitude                    : 48 deg 51' 24.11" N
GPS Longitude                   : 2 deg 21' 5.26" E
GPS Position                    : 48 deg 51' 24.11" N, 2 deg 21' 5.26" E

 

Slack : Migration des données de connexion vers MariaDB pour une utilisation dans Grafana

Pour utiliser le script il faut:

  • MariaDB
  • Python
  • Grafana.

Slack permet le téléchargement d’un fichier CSV ( access_logs.csv ), dont les données sont les suivantes :

  • Date Accessed,
  • User Agent – Simple,
  • User Agent – Full,
  • IP Address,
  • Number of Logins,
  • Last Date Accessed

Petit rappel sur l’ajout d’une database et d’un utilisateur :

$ sudo mysql -u root

MariaDB [(none)]> create database SLACK;

MariaDB [(none)]> CREATE USER 'slack'@'localhost' IDENTIFIED BY 'slack';

MariaDB [(none)]> GRANT ALL PRIVILEGES ON SLACK.* TO 'slack'@'localhost';

MariaDB [(none)]> FLUSH PRIVILEGES;

MariaDB [(none)]> \quit
Bye

Petit rappel aussi en python pour télécharger une classe non disponible :

$ sudo pip install python-dateutil

Le source du programme : ( Les sources sont disponibles ici : https://github.com/farias06/Grafana/blob/master/Slack_CSV_insert.py )

#! /usr/bin/env python3
# ~*~ utf-8 ~*~

import csv
from datetime import datetime
from dateutil.parser import parse
import mysql.connector
from mysql.connector import errorcode
from mysql.connector import (connection)


cnx = connection.MySQLConnection(user='slack', password='slack',
                                 host='127.0.0.1',
                                 database='SLACK')
cursor = cnx.cursor();
now = datetime.now().date();

#cursor.execute("DROP TABLE SLACK;");
#cursor.execute("CREATE TABLE SLACK (DATE datetime, DATE_LAST datetime, USER_AGENT varchar(50),USER_AGENT_FULL varchar(256), IP varchar(26), NUMBER int);");
cursor.execute("DELETE FROM SLACK");
cnx.commit();

with open('access_logs.csv', 'r') as csvfile:
    reader = csv.reader(csvfile, quotechar='"')
    for row in reader:
        MyDate=row[0];
        MyDate = MyDate.rsplit('(',1)[0];
        if (MyDate == "Date Accessed"):
           print("No");
        else:
           Dt = parse(MyDate)
           MyUser=row[1];
           MyUser=MyUser.replace("'", " ")
           MyUserFull=row[2];
           MyUserFull=MyUserFull.replace("'", " ")
           MyIP=row[3];
           MyNumber=row[4];
           MyDateLast=row[5];
           MyDateLast = MyDateLast.rsplit('(',1)[0];
           DtLast = parse(MyDateLast)
           try :
              SQLREQUEST = "INSERT INTO SLACK (DATE, USER_AGENT, USER_AGENT_FULL, IP, DATE_LAST, NUMBER) VALUES ('"+str(Dt.date())+" "+str(Dt.time())+"', '"+MyUser+"', '"+MyUserFull+"','"+MyIP+"', '"+str(DtLast.date())+" "+str(DtLast.time())+"', "+MyNumber+" );";
              cursor.execute(SQLREQUEST);
           except mysql.connector.Error as err:
              print("Something went wrong: {}".format(err))
              if err.errno == errorcode.ER_BAD_TABLE_ERROR:
                 print("Creating table SLACK")
              else:
                 None

cnx.commit();
cursor.close();
cnx.close();

# END 

Pour lancer le programme :

$ python Slack_CSV_insert.py

Ensuite pour voir les données il y a plusieurs requetes possibles pour le metric :

Par IP :

SELECT
  UNIX_TIMESTAMP(date) as time_sec,
  SUM(number) as value,
  ip as metric
FROM SLACK
WHERE $__timeFilter(date)
GROUP BY day(date),month(date),year(date)
ORDER BY date ASC

Par User Agent :

SELECT
  UNIX_TIMESTAMP(date) as time_sec,
  SUM(number) as value,
  user_agent as metric
FROM SLACK
WHERE $__timeFilter(date)
GROUP BY day(date),month(date),year(date)
ORDER BY date ASC

Par User Agent Full :

SELECT
  UNIX_TIMESTAMP(date) as time_sec,
  SUM(number) as value,
  user_agent_full as metric
FROM SLACK
WHERE $__timeFilter(date)
GROUP BY day(date),month(date),year(date)
ORDER BY date ASC

J’ai noté un bug, j’utilise la version Desktop sous Linux et je n’ai pas de « Application de Bureau Linux » .

MacOS : Python : Suppression des doublons d’emails avec l’API Python Elasticsearch/Kibana (Version V3)

Finalement dans les 200.000 emails je pense avoir des doublons … je vais donc profiter de l’export vers Elastciseach/Kibana pour voir si j’ai des doublons. L’email qu’il va avoir la même taille et le même checksum MD5 sera considéré comme un doublons.

Voici donc la version V3 (sans la suppression de fichier : os.unlink(path) )

#!/usr/bin/env python3

import email
import plistlib
import hashlib
import re
import glob, os
import string
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.header import Header, decode_header, make_header
from elasticsearch import Elasticsearch 

class Emlx(object):
        def __init__(self):
            super(Emlx, self).__init__()
            self.bytecount = 0
            self.msg_data = None
            self.msg_plist = None

        def parse(self, filename_path):
            with open(filename_path, "rb") as f:
                self.bytecount = int(f.readline().strip())
                self.msg_data = email.message_from_bytes(f.read(self.bytecount))
                self.msg_plist = plistlib.loads(f.read())
            return self.msg_data, self.msg_plist

def md5(fname):
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

if __name__ == '__main__':
   msg = Emlx()
   nb_parse = 0
   nb_error = 0
   save_space = 0
   list_email = []
   printable = set(string.printable)
   path_mail = "/Users/MonLogin/Library/Mail/V6/"
   es_keys = "mail"
   es=Elasticsearch([{'host':'localhost','port':9200}])
   for root, dirs, files in os.walk(path_mail):
      for file in files:
          if file.endswith(".emlx"):
             file_full = os.path.join(root, file)
             my_check = md5(root+'/'+file)
             my_count = list_email.count(my_check)
             list_email.append(my_check)
             message, plist = msg.parse(file_full)
             statinfo = os.stat(file_full)
             if (my_count > 0):
                save_space += int(statinfo.st_size)
                #os.unlink(root+'/'+file)
             my_date = message['Date']
             my_id = message['Message-ID']
             my_server = message['Received']
             my_date_str = ""
             if my_date is not None and my_date is not Header:
                 try:
                   my_date_str = datetime.fromtimestamp(parsedate_to_datetime(my_date).timestamp()).strftime('%Y-%m-%dT%H:%M:%S')
                 except :
                   my_date_str = ""
             my_email = str(message['From'])
             my_email = str(make_header(decode_header(my_email)))
             if my_email is not None:
                 my_domain = re.search("@[\w.\-\_]+", str(my_email))
                 if my_domain is not None:
                      my_domain_str = str(my_domain.group ());
                      my_domain_str = my_domain_str.lower()
             if my_email is not None:
                 my_name = re.search("[\w.\-\_]+@", str(my_email))
                 if my_name is not None:
                      my_name_str = str(my_name.group ());
                      my_name_str = my_name_str.lower()
             json = '{"checksum":"'+my_check+'","count":"'+str(my_count)+'","size":'+str(statinfo.st_size)
             if my_domain is not None:
                 #print(my_domain.group())
                 #print(my_name.group())
                 json = json+',"name":"'+my_name_str+'","domain":"'+my_domain_str+'"'
             else:
                 my_email = my_email.replace(",","")
                 my_email = my_email.replace('"','')
                 my_email = str(re.sub(r'[^\x00-\x7f]',r'', my_email)) 
                 my_email = my_email.lower()
                 json = json+',"name":"'+my_email+'","domain":"None"';
             if my_date is not None and len(my_date_str) > 1:
                 json = json+',"date":"'+my_date_str+'","id":'+str(nb_parse)
             else:
                 json = json+',"id":'+str(nb_parse)
             if my_server is not None and my_server is not Header:
                 ip = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', str(my_server))
                 if ip is not None:
                    my_ip = ip.group()
                    json = json+',"ip":"'+str(my_ip)+'"'
                 else:
                    my_ip = ""
                 #ip = re.findall(r'\b25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\b',my_server)
                 #ip = re.findall( r'[0-9]+(?:\.[0-9]+){1,3}', my_server )
                 #ip = re.findall(r'[\d.-]+', my_server) 
             else:
                 json = json
             if my_id is not None and my_id is not Header:
                 try:
                    my_id =my_id.strip()
                    my_id =my_id.strip('\n')
                    json = json+',"Message-ID":"'+my_id+'","file":"'+file+'"}'
                 except:
                    json = json+',"file":"'+file+'"}'
             else:
                 json = json+',"file":"'+file+'"}'
             print(json)
             try:
                res = es.index(index=es_keys,doc_type='emlx',id=nb_parse,body=json)
             except:
                nb_error += 1   
             nb_parse += 1
             #print(plist)
   print(nb_parse)

A suivre pour la V4 !

MacOS : Python : Découverte de l’API Python Elasticsearch/Kibana avec SVN pour des stats

Petit script pour envoyer l’historique d’un SVN vers Elasticsearch/Kibana. Avant j’utilisais statSVN : https://statsvn.org .

Pour l’installation sous Mac :

$ pip2 install --upgrade pip
$ pip2 install elasticsearch

Voici le programme :

import xml.etree.ElementTree as ET
import os
import re
from elasticsearch import Elasticsearch 
import sys

tree = ET.parse('svn.log')
root = tree.getroot()

count = 0;
nb_error = 0
es=Elasticsearch([{'host':'localhost','port':9200}])
es_keys="svn"

for logentry in root.iter('logentry'):
   revision = logentry.get('revision')
   author = logentry.find('author').text
   date = logentry.find('date').text
   msg = logentry.find('msg').text
   if msg is not None:
      msg = msg.replace("\n", " ")
      msg = msg.replace("\r", " ")
      msg = msg.rstrip('\r\n')
      msg = msg.strip('\r\n')
      msg = str(re.sub(r'[^\x00-\x7F]',' ', msg))
   paths = logentry.find('paths') 
   for path in paths.findall('path'):
      my_path = path.text
      my_basename = os.path.basename(my_path)
      my_dir = os.path.dirname(my_path)
      count += 1
      if msg is not None:
         json = '{"revision":'+revision+',"author":"'+author+'","date":"'+date+'","msg":"'+msg+'","basename":"'+my_basename+'","folder":"'+my_dir+'"}'
      else:
         json = '{"revision":'+revision+',"author":"'+author+'","date":"'+date+'","basename":"'+my_basename+'","folder":"'+my_dir+'"}'
      print(count,json)
      try:
        res = es.index(index=es_keys,doc_type='svn',id=count,body=json)
      except:
        nb_error += 1  

Il faut faire un export XML de SVN :

$ svn log -v --xml  > svn.log

Pour faire le check : http://127.0.0.1:9200/svn/_mappings , la réponse est du type :

{"svn":{"mappings":{"svn":{"properties":{"author":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"basename":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"date":{"type":"date"},"folder":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"msg":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"revision":{"type":"long"}}}}}}