01 - Map Reduce
sabato 28 settembre 2019 16:59
Il Map-Reduce è un framework utile a processare grandi quantitativi di dati in cluster, tramite un algoritmo
parallelo e distribuito. Esso si presta molto bene all'esecuzione di conteggi o somme di liste di parole nei
documenti.
Il Map-Reduce è un metodo di programmazione che prevede essenzialmente due funzioni, il Map e il
Reduce. Il sistema invece si occupa di gestire l'esecuzione in parallelo, coordina i task che eseguono il Map
e il Reduce e si occupa anche dei possibili errori durante l'esecuzione.
Il Map Reduce funziona come segue:
Map: a un certo numero Map task (nodi che effettuano il Map, anche detti Mapper) vengono
- assegnati uno o più chunk da un file system distribuito, i Mapper poi trasformano i chunk in una
sequenza di coppie chiave-valore secondo una logica definita nella funzione.
Shuffle: le coppie chiave-valore vengono raccolte e ordinate in base alla chiave, le chiavi vengono
- poi suddivise fra i Reducer in modo che tutte le coppie con la stessa chiave finiscano nello stesso
Reducer.
Reduce: lavora su una chiave per volta e combina tutti i valori associati a quella chiave in un certo
- modo definito dalla funzione.
Mapper (Map task)
I file di input al Mapper sono costituiti da elementi, che possono essere ad esempio tuple o documenti; un
chunk è una collezione di elementi e nessun elemento è salvato su due chunk. -
La funzione di Map prende un elemento in input e produce zero o più coppie , il tipo di chiavi e valori
sono arbitrari e le chiavi non devono essere necessariamente univoche. Il Mapper può produrre diverse
coppie - con la stessa chiave, anche dallo stesso elemento.
Shuffle
Appena la funzione di Map viene completata, le coppie vengono raggruppate per chiave e i valori
associati a ogni chiave formano una lista. Il raggruppamento viene effettuato dal sistema, senza tener
conto delle funzioni di Map e Reduce.
Il processo di Shuffle conosce quanti Reduce task ci sono, in quanto l'utente comunica al sistema il numero
di Reduce task. Lo shuffle allora, applica una funzione di hash alle chiavi e produce un numero di bucket
da a , ogni coppia in output da un Mapper viene messa in uno degli file locali, ogni file è poi
destinato a un Reduce task. La funzione di hash utilizzata può anche essere decisa dall'utente, in ogni
modo ogni chiave è assegnata a un unico Reduce task, ad esempio si può fare shuffle tramite
indirizzamento delle chiavi dal Mapper verso il Reduce task (chiavi minori di un certo valore a un Reducer,
Big Data Analysis Pagina 1
indirizzamento delle chiavi dal Mapper verso il Reduce task (chiavi minori di un certo valore a un Reducer,
altrimenti a un altro Reducer).
Per effettuare il raggruppamento e la distribuzione ai Reducer, i file di ogni Mapper vengono uniti in un
unico file formato da coppie chiave-lista di valori, in questo modo per ogni chiave , l'input al Reducer è
una coppia del tipo , dove sono tutte le coppie con chiave
provenienti da tutti i Mapper.
Reducer
L'argomento di una funzione di Reduce è una coppia contenente una chiave e una lista di valori associati,
l'output è una sequenza di zero o più coppie , che possono essere dello stesso tipo di quelle in output
dal Mapper o anche diverse. Ci si riferisce all'applicazione della funzione di Reduce a una singola chiave
con i relativi valori associati, con il termine Reducer.
Un Reduce task riceve una o più chiavi con i loro valori associati, in questo modo un reduce task può
effettuare anche più reducers. Gli output di tutti i reduce task vengono poi uniti in un singolo file.
La durata complessiva della funzione di Reduce è pari a quella del Reducer più pieno.
Esempio:
Si vogliono contare le occorrenze di ogni parola all'interno di un insieme di documenti, in questo caso ogni
documento è un elemento. La funzione di Map usa chiavi di tipo stringa (le parole) e valori interi. Il Map
task legge un documento e lo spezza nella sequenza di parole , dopodiché emette una
sequenza di coppie , dove il valore è sempre 1, l'output sarà del tipo .
Un singolo Map task normalmente processa più documenti (tutti quelli compresi in uno o più chunk), se
una parola appare più volte fra tutti i documenti assegnati a quel processo, allora ci saranno più coppie
nel suo output.
Dopo aver raggruppato e indirizzato le coppie ai vari Reduce task, la funzione di Reduce semplicemente
somma fra loro tutti i valori dando in output la parola e la somma. L'output di tutti i Reduce task è una
sequenza di coppie , dove è una parola che appare almeno una volta fra tutti i documenti di input
e è il numero totale di occorrenze di fra tutti i documenti.
Combiner
In alcuni casi, la funzione di Reduce è associativa e commutativa, quindi i valori possono essere combinati
in ogni ordine con lo stesso risultato. Quando la funzione è di questo tipo, si può anticipare parte del
lavoro del Reducer al Mapper. Per esempio, nel conteggio delle parole invece di produrre in uscita dal
Mapper le coppie , si può direttamente applicare la funzione somma all'interno del Map task prima
che l'output venga raggruppato e indirizzato, in questo modo le coppie con chiave generate da un
singolo Map task, vengono rimpiazzate da una coppia , dove è il numero di volte che appare
fra i documenti gestiti dal Map task. È comunque necessario fare il raggruppamento e passare il risultato
al Reduce task, dal momento che tipicamente c'è una coppia con chiave in uscita da ogni Map
task.
Esecuzione del Map Reduce
Un programma che fa uso del Map Reduce viene eseguito come segue:
Big Data Analysis Pagina 2
Il programma forka un processo Master e un certo numero di processi Worker a diversi nodi di calcolo,
normalmente un worker gestisce sia Map task (Map worker) o Reduce task (Reduce worker), ma non
entrambi.
Il Master ha molte responsabilità, la prima è quella di creare un certo numero di Map task e un certo
numero di Reduce task, indicati dal programma utente; questi task saranno assegnati ai processi Worker
da parte del Master. È ragionevole creare tanti Map task quanti sono i chunk del file in input, ma creare
meno Reduce task, in quanto ogni Map task crea un file intermedio per ogni reduce task e se ce ne sono
troppi, allora il numero di questi file esplode. Il Master tiene traccia dello stato di ogni task di Map e
Reduce (inattivo, in esecuzione in un particolare worker, completato). Un processo worker riporta al
Master quando conclude un task e il Master gliene assegna uno nuovo. Il Master inoltre, viene informato
della posizione e dimensione di ogni file intermedio creato dal Map e a quale Reduce task è destinato.
Quando un Reduce task viene assegnato dal Master a un processo worker, a quel task vengono assegnati
tutti i file che formano il suo input.
La cosa peggiore che può capitare è che si rompa il nodo di calcolo del Master, in questo caso l'intero Map
Reduce deve ricominciare, se falliscono altri nodi invece, questi vengono gestiti dal Master. Se per esempio
fallisce un nodo dove è in esecuzione un Map worker, questo viene rilevato dal Master e tutti i task
assegnati a quel worker devono essere rifatti, anche se completati, in quanto il loro output destinato ai
Reduce risiede nel nodo e non è più disponibile. Il Master setta gli stati di ognuno dei task a inattivo e li
schedula in un nuovo worker, inoltre informa anche ogni Reduce task che la posizione del suo input da
quel Map è cambiata.
Se invece fallisce un nodo dove è in esecuzione un Reduce worker, il Master semplicemente setta gli stati
dei suoi Reduce task a inattivo e li rischedula in un nuovo worker.
Algoritmi con Map-Reduce
Prodotto matrice-vettore
Dato che molto spesso le matrici sono sparse, ci conviene memorizzare i valori come una terna, questo ci
permette di risparmiare molta memoria nel caso in cui la matrice sia molto sparsa (comincia ad essere
efficace quando la matrice è sparsa per ):
Il vettore invece, è molto più piccolo della matrice ed in genere riesce a stare in memoria.
1. Ogni Map task opera su un chunck della matrice
Big Data Analysis Pagina 3
1. Ogni Map task opera su un chunck della matrice
Per ogni viene prodotta una coppia chiave valore
2.
3. Il Reducer poi somma tutti i valori associati alla stessa chiave
Esempio:
Il Mapper produrrà quattro coppie chiave, valore:
•
•
•
•
Shuffle:
•
•
Il Reducer poi effettua la somma dei valori:
•
•
Essendo la somma associativa e commutativa, questa può essere anche svolta dal combiner all'interno del
mapper.
Dividendo la matrice in terne, aumentiamo anche il parallelismo, dato che abbiamo la possibilità di
divedere più facilmente la matrice in chunks.
Se il vettore non riesce a stare in memoria, lo divido in pezzi. Dato che ogni elemento del vettore va
moltiplicato per la colonna della matrice corrispondente, divido anche la matrice in colonne, in questo
modo riduco gli accessi in memoria: Map 1
Map Map Map 2
1 2
Operazioni di Algebra Relazionale utilizzando il Map-Reduce
• Selection
Mapper: prende in input le tuple e manda in output al reducer delle coppie , che possono
essere ad esempio (chiave primaria, tupla) oppure (tupla, tupla) ecc., formate solamente da tuple
che soddisfano i criteri della select .
Reducer: passa in output le coppie senza effettuare alcuna operazione (identità).
• Projection - Elimina i duplicati
Mapper: per ogni tupla viene creata una tupla rimuovendo quei componenti che non sono in .
Reducer: emette solo una chiave valore per ogni tupla, eliminando i duplicati.
• UNION - Unisce due tabelle senza duplicati
Si considerano due relazioni e con lo stesso schema.
Mapper: per ogni tupla in input emette una coppia
Big Data Analysis Pagina 4
Mapper: per ogni tupla in input emette una coppia
Reducer: per ogni chiave posso avere uno o due valori, in ogni caso emette , in modo da non
avere duplicati.
• INTERSECTION - Unisce solo le tuple presenti in entrambe le tabelle
Mapper: per ogni tupla in input emette una coppia
Reducer: per ogni chiave posso avere uno o due valori, emette solo se ci sono due valori
associati alla chiave, in questo modo emetto solo le tuple che si trovano in entrambe le tabelle.
• DIFFERENCE - Toglie alla prima tabella solo i valori che sono presenti nella seconda
Mapper: per ogni tupla in emette e per ogni tupla in emette
Reducer: per ogni chiave , emette solo se il valore associato è , ciò vuol dire che la tupla si trova
solamente in .
• NATURAL JOIN
Mapper: per ogni tupla in emette e per ogni tupla in emette
Reducer: ogni chiave è associata a una lista di coppie o
Esempio Join:
Tabella
Tabella
Map:
○
○
○
○
○
○
Shuffle su :
○
○
Reducer, decidiamo in che modo unire i valori, ad esempio uniamo tutti i valori:
○
○
• Raggruppamento e Aggregazione raggruppamento su e aggregazione di
Mapper: produce la coppia chiave valore per ogni tupla
Reducer: applica l'aggregazione alla lista associata ad , il risultato è una coppia
nella quale è il risultato dell'aggregazione all lista.
○ Nel caso in cui ci siano più attributi di raggruppamento, il valore chiave del map è la lista di
questi valori
○ Se c'è più di un'aggregazione, il reduce li applica tutti
Esempio: sapere quante persone ci sono di una certa età
Prodotto fra matrici (riga per colonna) - Two steps
1st step: restituisce i prodotti parziali unendo gli elementi di e sull'indice
Mapper 1st step: M
per ogni è il nome della tabella
Big Data Analysis Pagina 5
M
○ per ogni è il nome della tabella
N
○ per ogni è il nome della tabella
Reducer 1st step: per ogni , esamina la lista dei valori associati. Per ogni valore di , , e per
ogni valore di , , produce un elemento con chiave e valore
Esempio primo step:
Map Step:
○
○
Reduce Step:
○
○
2nd step: effettua ragruppamento e aggregazione sulle coppie precedenti
Mapper 2nd step: identità
Reducer 2nd step: per ogni chiave produce la somma della lista dei valori associati con la chiave.
Il risultato è , dove è il valore nella riga e nella colonna della matrice
Esempio secondo step:
Map step (identity):
○
○
Reducer step:
○
○
○
○
Prodotto fra matrici (riga per colonna) - One step
Map:
○ Per ogni elemento di produce per che va da 1 al numero di
colonne di ( coppie di chiavi per ogni )
○ Per ogni elemento di produce per che va da 1 al numero di righe di
Reduce:
○ hanno una lista associata di tutti i valori di e con tutti i possibili
valori di
○ Per ogni i valori con la stessa vengono moltiplicati e poi i risultati vengono sommati
Esempio: Big Data Analysis Pagina 6
Map:
○
○
○
○
○
○
○
○
Shuffle:
○
○
○
○
Reduce:
○
○
○
○
Differenze:
• Nell'one step vengono prodotte più coppie chiave valore.
○ Two steps: ogni elemento di matrice corrisponde ad un elemento di map.
○ One step: ogni elemento di matrice corrisponde ad o elementi di map.
Communication Cost del Map-Reduce
Il costo del Map-Reduce è proporzionale alla dimensione dei dati in input al percorso di Map-Reduce.
Il costo di un algoritmo è quindi la somma dei costi di comunicazione di tutti i task implementati
dall'algoritmo.
La parte più onerosa del Map-Reduce è la trasmissione dei dati fra le macchine del cluster. Ciò avviene
quando effettuiamo il reduce. Solitamente, il master node fa effettuare il map nella macchina in cui si
trovano i dati, il reduce, invece, non sempre si può effettuare nella stessa macchina, dato che ogni istanza
della stessa chiave si deve trovare sulla stessa macchina.
Il tempo computazionale di ogni task può essere ignorato, dato che:
• L'algoritmo del task è in genere molto semplice, come il conteggio, ed è spesso lineare rispetto alla
dimensione dell'input.
• La velocità di interconnessione del cluster è molto più lenta rispetto alla velocità computazionale dei
processori che eseguono le istruzioni.
• In molte architetture di cluster, c'è competizione quando molti nodi hanno bisogno di comunicare
contemporaneamente.
• Il tempo necessario a muovere i dati in memoria principale può superare il tempo necessario ad
operare sui dati una volta che esso è disponibile in memoria.
Assumendo che il costo di comunicazione sia il costo dominante, perchè ci interessa solo la dimensione
dell'input e non quella dell'output?
1. Se l'output di un task è l'input di un altro task, allora la dimensione dell'output di sarà
considerata quando viene misurata la dimensione di input del task ricevente.
○ Non c'è alcuna ragione di misurare la dimensione di un qualsiasi output, eccetto per quei task i
quali output formano il risultato dell'intero algoritmo
Big Data Analysis Pagina 7
quali output formano il risultato dell'intero algoritmo
2. L'algoritmo di output è raramente grande se confrontato con l'input o con i dati intermedi prodotti
dall'algoritmo.
○ Il motivo è che output molto grandi non possono essere facilmente compresi e utilizzati se non
vengono prima indicizzati o aggregati in qualche modo.
Per ridurre i tempi di trasmissione potrei effettuare tutte le operazioni all'interno della stessa macchina.
Ovviamente questo mi fa perdere il vantaggio del parallelismo, quindi bisogna trovare un giusto
compromesso fra Wall-Clock Time e tempi di trasmissione dati.
Wall-Clock Time: tempo necessario per completare un algoritmo in parallelo.
Complexity Theory for Map-Reduce - Reducer Size e Replication Rate
La reducer size è il limite superiore del numero di valori che possono apparire nella lista associata ad una
singola chiave.
• Rendendo piccolo, forziamo un maggior numero di reducers. ( è inversamente proporzionale al
numero di reducers)
○ Più reducer tasks significa più grado di parallelismo e quindi un Wall-Clock time minore.
• Possiamo scegliere un reducer size abbastanza piccolo da essere certi che la computazione associata
ad un singolo reducer possa essere eseguita interamente in memoria primaria.
Esempio: Nel conteggio delle parole la reducer size è il numero di occorrenze di quella parola nel testo. Se
avessi un combiner sarebbe il numero dei map nodes.
Il replication rate è il numero di coppie chiave-valore che sono prodotte da tutti i Map tasks su tutti gli
input, diviso il numero di input (il numero di coppie-chiave valore che vengono prodotte per ogni input).
Può essere visto come la comunicazione media fra i Map tasks e i Reducer Tasks per input.
• Aumentare il replication rate significa aumentare il parallelismo, ma anche i costi di trasmissione.
• Viceversa, con piccolo si perde parallelismo, ma diminuiscono anche i costi di trasmissione.
Il prodotto tra matrici two steps produce una coppia chiave-valore per ogni elemento, mentre l'algoritmo
one step ne produce per ogni elemento, dove è la dimensione della matrice.
Aumento il numero di coppie chiave-valore, e quindi del replication rate, i costi di comunicazione
aumentano, ma se ne ho poche non posso applicare molto parallelismo in fase di riduzione.
Esempio: One step matrix multiplication, tutte le matrici sono
Replication rate è pari ad :
○ per ogni elemento , vengono prodotte coppie chiave-valore.
○ per ogni elemento , vengono prodotte coppie chiave-valore.
Il reducer size richiesto è :
○ per ogni chiave ci sono coppie chiave-valore che rappresentano gli
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.
Scarica il documento per vederlo tutto.