Articles

Introduzione ad Apache Spark con esempi e casi d’uso

Ho sentito parlare di Spark per la prima volta alla fine del 2013 quando mi sono interessato a Scala, il linguaggio in cui è scritto Spark. Qualche tempo dopo, ho fatto un divertente progetto di scienza dei dati cercando di prevedere la sopravvivenza sul Titanic. Questo si è rivelato essere un ottimo modo per ottenere ulteriormente introdotto ai concetti Spark e programmazione. Lo consiglio vivamente per tutti gli aspiranti sviluppatori Spark alla ricerca di un posto per iniziare.

Oggi, Spark viene adottato da importanti attori come Amazon, eBay e Yahoo! Molte organizzazioni eseguono Spark su cluster con migliaia di nodi. Secondo le FAQ Spark, il più grande cluster conosciuto ha oltre 8000 nodi. In effetti, Spark è una tecnologia che vale la pena prendere nota e conoscere.

Questo articolo fornisce un’introduzione a Spark inclusi casi d’uso ed esempi. Esso contiene informazioni dal sito web Apache Spark così come il libro di apprendimento Spark – Lightning-Fast Big Data Analysis.

Che cos’è Apache Spark? Un’introduzione

Spark è un progetto Apache pubblicizzato come “lightning fast cluster computing”. Ha una fiorente comunità open-source ed è il progetto Apache più attivo al momento.

Spark fornisce una piattaforma di elaborazione dati più veloce e più generale. Spark consente di eseguire programmi fino a 100 volte più veloce in memoria, o 10 volte più veloce su disco, di Hadoop. L’anno scorso, Spark ha assunto Hadoop completando il 100 TB Daytona GraySort contest 3x più veloce su un decimo del numero di macchine ed è diventato anche il motore open source più veloce per l’ordinamento di un petabyte.

Spark permette anche di scrivere codice più rapidamente, come si dispone di oltre 80 operatori di alto livello a vostra disposizione. Per dimostrarlo, diamo un’occhiata al ” Ciao Mondo!”di BigData: l’esempio di conteggio delle parole. Scritto in Java per MapReduce ha circa 50 righe di codice, mentre in Spark (e Scala) puoi farlo semplicemente come questo:

sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")

Un altro aspetto importante quando si impara come usare Apache Spark è la shell interattiva (REPL) che fornisce pronta all’uso. Usando REPL, è possibile testare il risultato di ogni riga di codice senza prima dover codificare ed eseguire l’intero lavoro. Il percorso verso il codice di lavoro è quindi molto più breve e l’analisi dei dati ad hoc è resa possibile.

Altre caratteristiche chiave di Spark includono:

  • Attualmente fornisce API in Scala, Java e Python, con supporto per altri linguaggi (come R) sulla strada
  • Si integra bene con l’ecosistema Hadoop e le origini dati (HDFS, Amazon S3, Hive, HBase, Cassandra, ecc.)
  • Può essere eseguito su cluster gestiti da Hadoop YARN o Apache Mesos, e può anche eseguire standalone

Il nucleo Spark è completato da una serie di potenti librerie di livello superiore che possono essere utilizzate senza problemi nella stessa applicazione. Queste librerie attualmente includono SparkSQL, Spark Streaming, MLlib (per l’apprendimento automatico) e GraphX, ognuna delle quali è ulteriormente dettagliata in questo articolo. Altre librerie Spark ed estensioni sono attualmente in fase di sviluppo pure.

librerie ed estensioni spark

Spark Core

Spark Core è il motore di base per l’elaborazione di dati su larga scala in parallelo e distribuiti. È responsabile di:

  • gestione della memoria e recupero dei guasti
  • pianificazione, distribuzione e monitoraggio dei processi su un cluster
  • interazione con i sistemi di storage

Spark introduce il concetto di RDD (Resilient Distributed Dataset), una raccolta distribuita di oggetti tollerante ai guasti immutabile che può essere gestita in parallelo. Un RDD può contenere qualsiasi tipo di oggetto e viene creato caricando un set di dati esterno o distribuendo una raccolta dal programma driver.

RDDs supportano due tipi di operazioni:

  • Le trasformazioni sono operazioni (come map, filter, join, union e così via) che vengono eseguite su un RDD e che producono un nuovo RDD contenente il risultato.
  • Le azioni sono operazioni (come ridurre, contare, prima e così via) che restituiscono un valore dopo aver eseguito un calcolo su un RDD.

Le trasformazioni in Spark sono “pigre”, il che significa che non calcolano immediatamente i loro risultati. Invece, semplicemente “ricordano” l’operazione da eseguire e il set di dati (ad esempio, file) a cui l’operazione deve essere eseguita. Le trasformazioni vengono effettivamente calcolate solo quando viene chiamata un’azione e il risultato viene restituito al programma driver. Questo design consente a Spark di funzionare in modo più efficiente. Ad esempio, se un file di grandi dimensioni è stato trasformato in vari modi e passato alla prima azione, Spark elaborerebbe e restituirebbe il risultato solo per la prima riga, piuttosto che eseguire il lavoro per l’intero file.

Per impostazione predefinita, ogni RDD trasformato può essere ricalcolato ogni volta che si esegue un’azione su di esso. Tuttavia, è anche possibile mantenere un RDD in memoria utilizzando il metodo persist o cache, nel qual caso Spark manterrà gli elementi sul cluster per un accesso molto più veloce la prossima volta che lo si interroga.

SparkSQL

SparkSQL è un componente Spark che supporta l’interrogazione dei dati tramite SQL o tramite il linguaggio di query Hive. È nato come porta Apache Hive per funzionare su Spark (al posto di MapReduce) ed è ora integrato con lo stack Spark. Oltre a fornire supporto per varie origini dati, consente di tessere query SQL con trasformazioni di codice che si traduce in uno strumento molto potente. Di seguito è riportato un esempio di query compatibile con Hive:

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQLsqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

Spark Streaming

Spark Streaming supporta l’elaborazione in tempo reale dei dati di streaming, come i file di log del server Web di produzione (ad esempio Apache Flume e HDFS/S3), social media come Twitter e varie code di Sotto il cofano, Spark Streaming riceve i flussi di dati di input e divide i dati in batch. Successivamente, vengono elaborati dal motore Spark e generano il flusso finale dei risultati in lotti, come illustrato di seguito.

spark streaming

L’API Spark Streaming è molto simile a quella del Core Spark, rendendo facile per i programmatori lavorare nel mondo dei dati batch e in streaming.

MLlib

MLlib è una libreria di apprendimento automatico che fornisce vari algoritmi progettati per scalare su un cluster per la classificazione, la regressione, il clustering, il filtro collaborativo e così via (controlla l’articolo di Toptal sull’apprendimento automatico per ulteriori informazioni su questo argomento). Alcuni di questi algoritmi funzionano anche con lo streaming di dati, come la regressione lineare usando i minimi quadrati ordinari o il clustering k-means (e altro ancora in arrivo). Apache Mahout (una libreria di apprendimento automatico per Hadoop) si è già allontanato da MapReduce e ha unito le forze su Spark MLlib.

GraphX

graphx

GraphX è una libreria per la manipolazione di grafici e l’esecuzione di operazioni grafo-parallele. Fornisce uno strumento uniforme per ETL, analisi esplorativa e calcoli grafici iterativi. Oltre alle operazioni integrate per la manipolazione del grafico, fornisce una libreria di algoritmi grafici comuni come PageRank.

Come usare Apache Spark: Event Detection Use Case

Ora che abbiamo risposto alla domanda “Cos’è Apache Spark?”, pensiamo a che tipo di problemi o sfide potrebbe essere utilizzato in modo più efficace.

Mi sono imbattuto in un articolo di recente su un esperimento per rilevare un terremoto analizzando un flusso di Twitter. È interessante notare che è stato dimostrato che questa tecnica era probabile che ti informasse di un terremoto in Giappone più velocemente della Japan Meteorological Agency. Anche se hanno usato diverse tecnologie nel loro articolo, penso che sia un ottimo esempio per vedere come potremmo mettere Spark da usare con frammenti di codice semplificati e senza il codice di colla.

In primo luogo, dovremmo filtrare i tweet che sembrano rilevanti come “terremoto” o “scuotimento”. Potremmo facilmente usare Spark Streaming per questo scopo come segue:

TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))

Quindi, dovremmo eseguire alcune analisi semantiche sui tweet per determinare se sembrano fare riferimento a un evento di terremoto corrente. Tweets come ” Terremoto!”o” Ora sta tremando”, per esempio, sarebbe prendere in considerazione le partite positive, mentre tweets come” Partecipare a una conferenza terremoto ”o” Il terremoto di ieri è stato spaventoso ” non lo farebbero. Gli autori del documento hanno utilizzato una macchina vettoriale di supporto (SVM) per questo scopo. Faremo lo stesso qui, ma possiamo anche provare una versione in streaming. Un esempio di codice risultante da MLlib sarebbe simile al seguente:

// We would prepare some earthquake tweet data and load it in LIBSVM format.val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt")// Split data into training (60%) and test (40%).val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)val training = splits(0).cache()val test = splits(1)// Run training algorithm to build the modelval numIterations = 100val model = SVMWithSGD.train(training, numIterations)// Clear the default threshold.model.clearThreshold()// Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label)}// Get evaluation metrics.val metrics = new BinaryClassificationMetrics(scoreAndLabels)val auROC = metrics.areaUnderROC()println("Area under ROC = " + auROC)

Se siamo soddisfatti del tasso di previsione del modello, potremmo passare allo stadio successivo e reagire ogni volta che scopriamo un terremoto. Per rilevarne uno abbiamo bisogno di un certo numero (cioè densità) di tweet positivi in una finestra temporale definita (come descritto nell’articolo). Si noti che, per i tweet con servizi di localizzazione Twitter abilitati, vorremmo anche estrarre la posizione del terremoto. Armati di questa conoscenza, potremmo usare SparkSQL e interrogare una tabella Hive esistente (memorizzando gli utenti interessati a ricevere notifiche di terremoto) per recuperare i loro indirizzi e-mail e inviare loro un’e-mail di avviso personalizzata, come segue:

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)// sendEmail is a custom functionsqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)

Altri casi d’uso di Apache Spark

I potenziali casi d’uso di Spark

Ecco un rapido (ma certamente neanche lontanamente esaustivo!) campionamento di altri casi d’uso che richiedono di trattare con la velocità, varietà e volume dei Big Data, per i quali la Scintilla è così adatto:

l’industria del gioco, l’elaborazione e la scoperta di modelli potenziale di firehose in tempo reale degli eventi di gioco e di essere in grado di rispondere immediatamente, è una capacità che potrebbe produrre un business lucrativo, per esempio a fini di conservazione del giocatore, pubblicità mirata, regolazione automatica del livello di complessità, e così via.

Nel settore dell’e-commerce, le informazioni sulle transazioni in tempo reale potrebbero essere passate a un algoritmo di clustering in streaming come k-means o un filtro collaborativo come ALS. I risultati potrebbero quindi anche essere combinati con altre fonti di dati non strutturate, come i commenti dei clienti o le recensioni dei prodotti, e utilizzati per migliorare e adattare costantemente le raccomandazioni nel tempo con le nuove tendenze.

Nel settore finanziario o della sicurezza, lo stack Spark potrebbe essere applicato a un sistema di rilevamento di frodi o intrusioni o all’autenticazione basata sul rischio. Potrebbe ottenere risultati di prim’ordine raccogliendo enormi quantità di log archiviati, combinandoli con origini dati esterne come informazioni su violazioni dei dati e account compromessi (vedi, ad esempio, https://haveibeenpwned.com/) e informazioni dalla connessione/richiesta come la geolocalizzazione IP o il tempo.

Conclusione

Per riassumere, Spark aiuta a semplificare il compito impegnativo e computazionalmente intensivo di elaborazione di elevati volumi di dati in tempo reale o archiviati, sia strutturati che non strutturati, integrando perfettamente funzionalità complesse rilevanti come l’apprendimento automatico e gli algoritmi di grafico. Spark porta grande elaborazione dei dati per le masse. Guarda qua!