Machine Learning: il Segreto è il Modello, ma anche il Codice!
Nella maggior parte dei lavori nell’ambito di Machine Learning, non si fa ricerca per migliorare l’architettura di un modello o per progettare una nuova loss function. Nella maggior parte dei casi si deve utilizzare ciò che già esiste e adattarlo al proprio caso d’uso.
È quindi molto importante ottimizzare il progetto in termini di architettura del software e di implementazione in generale. Tutto parte da qui: si vuole un codice ottimale, che sia pulito, riutilizzabile e che funzioni il più velocemente possibile. Threading è una libreria nativa di Python che non viene utilizzata così spesso come dovrebbe.
Riguardo i Thread
I thread sono un modo per un processo di dividersi in due o più attività in esecuzione simultanea (o pseudo-simultanea). Un thread è contenuto all’interno di un processo e thread diversi dello stesso processo condividono le stesse risorse.
In questo articolo non si parlo di multiprocessing, ma la libreria per il multiprocessing di Python funziona in modo molto simile a quella per il multithreading.
In generale:
- Il multithreading è ottimo per i compiti legati all’I/O, come la chiamata di un’API all’interno di un ciclo for
- Il multiprocessing è usato per i compiti legati alla CPU, come la trasformazione di più dati tabellari in una volta sola
Quindi, se vogliamo eseguire più cose contemporaneamente, possiamo farlo usando i thread. La libreria Python per sfruttare i thread si chiama threading.
Cominciamo in modo semplice. Voglio che due thread Python stampino qualcosa contemporaneamente. Scriviamo due funzioni che contengono un ciclo for per stampare alcune parole.
def print_hello():
for x in range(1_000):
print("hello")
def print_world():
for x in range(1_000):
print("world")
Ora, se li eseguo uno dopo l’altro, vedrò nel mio terminale 1.000 volte la parola “hello” seguita da 1.000 “world”.
Utilizziamo invece i thread. Definiamo due thread e assegniamo a ciascuno di essi le funzioni definite in precedenza. Poi avviamo i thread. Dovreste vedere la stampa di “hello” e “world” alternarsi sul vostro terminale.
Se prima di continuare l’esecuzione del codice si vuole aspettare che i thread finiscano, è possibile farlo utilizzando: join().
import threding
thread_1 = threding.Thread(target = print_hello)
thread_2 = threding.Thread(target = print_world)
thread_1.start()
thread_2.start()
# wait for the threads to finish before continuing running the code
thread_1.join()
thread_2.join()
print("do other stuff")
Lock delle risorse dei thread
A volte può accadere che due o più thread modifichino la stessa risorsa, ad esempio una variabile contenente un numero.
Un thread ha un ciclo for che aggiunge sempre uno alla variabile e l’altro sottrae uno. Se eseguiamo questi thread insieme, la variabile avrà “sempre” il valore di zero (più o meno). Ma noi vogliamo ottenere un comportamento diverso. Il primo thread che prenderà possesso di questa variabile deve aggiungere o sottrarre 1 fino a raggiungere un certo limite. Poi rilascerà la variabile e l’altro thread sarà libero di prenderne possesso e di eseguire le sue operazioni.
import threading
import time
x = 0
lock = threading.Lock()
def add_one():
global x, lock # use global to work with globa vars
lock.acquire()
while x -10:
x = x -1
print(x)
time.sleep(1)
print("reached minimum")
lock.release()
Nel codice sopra riportato, abbiamo due funzioni. Ciascuna sarà eseguita da un thread. Una volta avviata, la funzione bloccherà la variabile lock, in modo che il secondo thread non possa accedervi finché il primo non ha finito.
thread_1 = threading.Thread(target = add_one)
thread_2 = threading.Thread(target = subtract_one)
thread_1.start()
thread_2.start()
Lock usando un semaforo
Possiamo ottenere un risultato simile a quello ottenuto sopra utilizzando i semafori. Supponiamo di voler far accedere a una funzione un numero totale di thread contemporaneamente. Ciò significa che non tutti i thread avranno accesso a questa funzione, ma solo 5, per esempio. Gli altri thread dovranno aspettare che alcuni di questi 5 finiscano i loro calcoli per avere accesso alla funzione ed eseguire lo script. Possiamo ottenere questo comportamento utilizzando un semaforo e impostando il suo valore a 5. Per avviare un thread con un argomento, possiamo usare args nell’oggetto Thread.
import time
import threading
semaphore = threading.BoudnedSemaphore(value=5)
def func(thread_number):
print(f"{thread_number} is trying to access the resource")
semaphore.acquire()
print(f"{thread_number} granted access to the resource")
time.sleep(12) #fake some computation
print(f"{thread_number} is releasing resource")
semaphore.release()
if __name__ == "__main__":
for thread_number in range(10):
t = threading.Thread(target = func, args = (thread_number,)
t.start()
time.sleep(1)
Eventi
Gli eventi sono semplici meccanismi di segnalazione usati per coordinare i thread. Si può pensare a un evento come a un flag che si può selezionare o deselezionare, e gli altri thread possono aspettare che venga selezionato prima di continuare il loro lavoro.
Ad esempio, nel seguente esempio, il thread_1 che vuole eseguire la funzione “func” deve attendere che l’utente inserisca “sì” e scateni l’evento per poter terminare l’intera funzione.
import threading
event = threading.Event()
def func():
print("This event function is waiting to be triggered")
event.wait()
print("event is triggered, performing action now")
thread_1 = threading.Thread(target = func)
thread_1.start()
x = input("Do you want to trigger the event? \n")
if x == "yes":
event.set()
else
print("you chose not to trigger the event")
Daemon Thread
Si tratta semplicemente di thread che vengono eseguiti in background. Lo script principale termina anche se questo thread in background è ancora in esecuzione. Ad esempio, si può usare un thread demone per leggere continuamente da un file che viene aggiornato nel tempo.
Scriviamo uno script in cui un thread demone legge continuamente da un file e aggiorna una variabile stringa e un altro thread stampa su console il contenuto di tale variabile.
import threading
import time
path = "myfile.txt"
text = ""
def read_from_file():
global path, text
while True:
with open(path, "r") as f:
text = f.read()
time.sleep(4)
def print_loop():
for x in range(30):
print(text)
time.sleep(1)
thread_1 = threading.Thread(target = read_from_file, daemon = True)
thread_2 = threading.Thread(target = print_loop)
thread_1.start()
thread_2.start()
Queues (code)
Una coda è un insieme di elementi che obbedisce al principio first-in/first-out (FIFO). È un metodo per gestire strutture di dati in cui il primo elemento viene elaborato per primo e l’elemento più recente per ultimo.
Possiamo anche cambiare l’ordine di priorità con cui processiamo gli elementi della collezione. LIFO, ad esempio, sta per Last-in/First-out. Oppure, in generale, possiamo avere una coda di priorità in cui possiamo scegliere manualmente l’ordine.
Se più thread vogliono lavorare su un elenco di elementi, ad esempio un elenco di numeri, potrebbe verificarsi il problema che due thread eseguano calcoli sullo stesso elemento. Vogliamo evitare questo problema. Perciò possiamo avere una coda condivisa tra i thread e, quando un thread esegue il calcolo su un elemento, questo elemento viene rimosso dalla coda. Vediamo un esempio.
import queue
q = queue.Queue() # it can also be a LifoQueue or PriorityQueue
number_list = [10, 20, 30, 40, 50, 60, 70, 80]
for number in number_list:
q.put(number)
print(q.get()) # -> 10
print(1.het()) # -> 20
Un esempio di thread in un progetto di Machine Learning
Supponiamo di lavorare a un progetto che richiede una pipeline di streaming e preelaborazione dei dati. Questo accade in molti progetti con dispositivi IoT o qualsiasi tipo di sensore. Un thread demone in background può recuperare e preelaborare continuamente i dati mentre il thread principale si concentra sull’inferenza.
Per esempio, in un caso semplice in cui devo sviluppare un sistema di classificazione delle immagini in tempo reale utilizzando il feed della mia telecamera. Imposterei il mio codice con 2 thread:
- Recuperare le immagini dal feed della telecamera in tempo reale.
- Passare le immagini a un modello di AI per l’inferenza.
import threading
import time
import queue
import random
# Sfake image classifier
def classify_image(image):
time.sleep(0.5) # fake the model inference time
return f"Classified {image}"
def camera_feed(image_queue, stop_event):
while not stop_event.is_set():
# Simulate capturing an image
image = f"Image_{random.randint(1, 100)}"
print(f"[Camera] Captured {image}")
image_queue.put(image)
time.sleep(1) # Simulate time between captures
def main_inference_loop(image_queue, stop_event):
while not stop_event.is_set() or not image_queue.empty():
try:
image = image_queue.get(timeout=1) # Fetch image from the queue
result = classify_image(image)
print(f"[Model] {result}")
except queue.Empty:
continue
if __name__ == "__main__":
image_queue = queue.Queue()
stop_event = threading.Event()
camera_thread = threading.Thread(target=camera_feed, args=(image_queue, stop_event), daemon=True)
camera_thread.start()
try:
main_inference_loop(image_queue, stop_event)
except KeyboardInterrupt:
print("Shutting down...")
stop_event.set() # Signal the camera thread to stop
finally:
camera_thread.join() # Ensure the camera thread terminates properly
print("All threads terminated.")
In questo semplice esempio, abbiamo:
- Un thread demone: L’input della telecamera viene eseguito in background, in modo da non bloccare l’uscita del programma al completamento del thread principale.
- Evento per il coordinamento: Questo evento stop_event consente al thread principale di segnalare al thread demone di terminare.
- Coda per la comunicazione: image_queue assicura una comunicazione sicura tra i thread.
Conclusioni
In questo tutorial vi ho mostrato come utilizzare la libreria di threading in Python, affrontando concetti fondamentali come lock, semafori ed eventi, oltre a casi d’uso più avanzati come i thread daemon e le code.
Vorrei sottolineare che il threading non è solo skill tecnica, ma piuttosto una mentalità che consente di scrivere codice pulito, efficiente e riutilizzabile. Quando si gestiscono chiamate API, si elaborano flussi di dati di sensori o si costruisce un’applicazione di AI in tempo reale, il threading consente di costruire sistemi robusti, reattivi e pronti a scalare.
L'articolo Machine Learning: il Segreto è il Modello, ma anche il Codice! proviene da il blog della sicurezza informatica.