Articles

Wprowadzenie do Apache Spark z przykładami i przypadkami użycia

Po raz pierwszy usłyszałem o Spark pod koniec 2013 roku, kiedy zainteresowałem się Scala, językiem, w którym jest napisany Spark. Jakiś czas później zrobiłem fajny projekt data science próbując przewidzieć przetrwanie na Titanicu. Okazało się to świetnym sposobem na dalsze zapoznanie się z koncepcjami i programowaniem Spark. Gorąco polecam go wszystkim początkującym programistom Spark, którzy szukają miejsca na rozpoczęcie pracy.

dziś Spark jest adoptowany przez głównych graczy, takich jak Amazon, eBay i Yahoo! Wiele organizacji działa na klastrach Spark z tysiącami węzłów. Według Spark FAQ, największy znany klaster ma ponad 8000 węzłów. Rzeczywiście, Spark jest technologią, którą warto wziąć pod uwagę i poznać.

Ten artykuł zawiera wprowadzenie do programu Spark, w tym przykłady użycia. Zawiera informacje ze strony Apache Spark, a także książkę Learning Spark-błyskawiczna Analiza Big Data.

co to jest Apache Spark? Wprowadzenie

Spark to projekt Apache reklamowany jako „błyskawiczne przetwarzanie klastrów”. Ma kwitnącą społeczność open-source i jest najbardziej aktywnym projektem Apache w tej chwili.

Spark zapewnia szybszą i bardziej ogólną platformę przetwarzania danych. Spark umożliwia uruchamianie programów do 100 razy szybciej w pamięci lub 10 razy szybciej na dysku niż Hadoop. W zeszłym roku Spark przejął Hadoop, kończąc Konkurs Daytona GraySort 100 TB 3x szybciej na jednej dziesiątej liczby maszyn, a także stał się najszybszym silnikiem open source do sortowania petabajtów.

Spark umożliwia również szybsze pisanie kodu, ponieważ masz do dyspozycji ponad 80 operatorów wysokiego poziomu. Aby to zademonstrować, rzućmy okiem na ” Hello World!”of BigData: the word Count example. Napisany w Javie dla MapReduce ma około 50 linii kodu, podczas gdy w Spark (i Scali) możesz to zrobić tak po prostu:

sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")

innym ważnym aspektem podczas nauki korzystania z Apache Spark jest interactive shell (REPL), który dostarcza po wyjęciu z pudełka. Używając REPL, można przetestować wynik każdej linii kodu bez konieczności kodowania i wykonywania całego zadania. Ścieżka do kodu roboczego jest zatem znacznie krótsza i możliwa jest analiza danych ad-hoc.

dodatkowe kluczowe cechy Spark to:

  • obecnie dostarcza API w Scali, Javie i Pythonie, z obsługą innych języków (takich jak R) po drodze
  • dobrze integruje się z ekosystemem Hadoop i źródłami danych (HDFS, Amazon S3, Hive, HBase, Cassandra itp.)
  • może działać na klastrach zarządzanych przez Hadoop YARN lub Apache Mesos, a także może działać samodzielnie

rdzeń Spark jest uzupełniony o zestaw potężnych bibliotek wyższego poziomu, które mogą być płynnie używane w tej samej aplikacji. Obecnie biblioteki te obejmują SparkSQL, Spark Streaming, MLlib (do uczenia maszynowego) i GraphX, z których każda jest szczegółowo opisana w tym artykule. Obecnie trwają również prace nad dodatkowymi bibliotekami i rozszerzeniami Spark.

biblioteki i rozszerzenia spark

Spark Core

Spark Core jest podstawowym silnikiem do równoległego i rozproszonego przetwarzania danych na dużą skalę. Jest odpowiedzialny za:

  • zarządzanie pamięcią i odzyskiwanie błędów
  • planowanie, dystrybucja i monitorowanie zadań w klastrze
  • interakcja z systemami pamięci masowej

Spark wprowadza koncepcję RDD (Resilient Distributed Dataset), niezmiennego, odpornego na błędy, rozproszonego zbioru obiektów, które mogą być obsługiwane równolegle. RDD może zawierać dowolny typ obiektu i jest tworzony przez załadowanie zewnętrznego zbioru danych lub dystrybucję kolekcji z programu sterownika.

RDD obsługują dwa rodzaje operacji:

  • transformacje są operacjami (takimi jak map, filter, join, union, itd.), które są wykonywane na RDD i które dają nowy RDD zawierający wynik.
  • akcje są operacjami (takimi jak reduce, count, first, itd.), które zwracają wartość po uruchomieniu obliczeń na RDD.

Transformacje w Spark są „leniwe”, co oznacza, że nie obliczają od razu swoich wyników. Zamiast tego po prostu” zapamiętują ” operację do wykonania i zbiór danych (np. plik), do którego operacja ma być wykonana. Transformacje są faktycznie obliczane tylko wtedy, gdy wywołana jest akcja, a wynik jest zwracany do Programu sterowników. Ta konstrukcja umożliwia bardziej wydajne działanie Spark. Na przykład, jeśli duży plik został przekształcony na różne sposoby i przekazany do pierwszej akcji, Spark przetworzy i zwróci wynik tylko dla pierwszej linii, zamiast wykonywać pracę dla całego pliku.

domyślnie każde przekształcone RDD może być obliczane za każdym razem, gdy wykonasz na nim akcję. Możesz jednak również zapisać RDD w pamięci używając metody persist lub cache, w którym to przypadku Spark będzie trzymał elementy w klastrze, aby uzyskać znacznie szybszy dostęp przy następnym zapytaniu.

SparkSQL

SparkSQL jest komponentem Spark, który obsługuje zapytania danych za pośrednictwem języka SQL lub języka zapytań Hive. Powstał jako port Apache Hive do uruchomienia na górze Spark (w miejsce MapReduce) i jest teraz zintegrowany ze stosem Spark. Oprócz zapewnienia wsparcia dla różnych źródeł danych, umożliwia splot zapytań SQL z transformacjami kodu, co skutkuje bardzo potężnym narzędziem. Poniżej znajduje się przykład zapytania zgodnego z Hive:

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQLsqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

Spark Streaming

Spark Streaming obsługuje przetwarzanie w czasie rzeczywistym danych strumieniowych, takich jak pliki dziennika serwera produkcyjnego (np. Pod maską Spark Streaming odbiera wejściowe strumienie danych i dzieli dane na partie. Następnie są one przetwarzane przez silnik zapłonowy i generują końcowy strumień wyników w partiach, jak pokazano poniżej.

Spark streaming

API Spark Streaming jest ściśle zgodne z rdzeniem Spark, co ułatwia programistom pracę w świecie zarówno wsadowych, jak i strumieniowych danych.

MLlib

MLlib to biblioteka uczenia maszynowego, która zapewnia różne algorytmy zaprojektowane do skalowania na klastrze w celu klasyfikacji, regresji, klastrowania, filtrowania opartego na współpracy i tak dalej (więcej informacji na ten temat znajdziesz w artykule Toptal na temat uczenia maszynowego). Niektóre z tych algorytmów działają również z danymi strumieniowymi, takimi jak regresja liniowa przy użyciu zwykłych najmniejszych kwadratów lub grupowanie K-oznacza (i więcej na drodze). Apache Mahout (biblioteka uczenia maszynowego dla Hadoop) odwrócił się już od MapReduce i połączył siły na Spark MLlib.

GraphX

graphx

GraphX jest biblioteką do manipulowania wykresami i wykonywania operacji graficzno-równoległych. Zapewnia jednolite narzędzie do ETL, analizy eksploracyjnej i iteracyjnych obliczeń grafów. Oprócz wbudowanych operacji do manipulacji grafem, zapewnia bibliotekę popularnych algorytmów grafów, takich jak PageRank.

Jak korzystać z Apache Spark: przypadek użycia wykrywania zdarzeń

teraz, gdy odpowiedzieliśmy na pytanie „Co To jest Apache Spark?”, zastanówmy się, jakiego rodzaju problemy lub wyzwania mogą być wykorzystane do najbardziej efektywnie.

natknąłem się ostatnio na artykuł o eksperymencie mającym na celu wykrycie trzęsienia ziemi poprzez analizę strumienia na Twitterze. Co ciekawe, wykazano, że ta technika prawdopodobnie poinformuje o trzęsieniu ziemi w Japonii szybciej niż japońska agencja meteorologiczna. Mimo że w swoim artykule wykorzystali inną technologię, myślę, że jest to świetny przykład, aby zobaczyć, jak możemy użyć Spark z uproszczonymi fragmentami kodu i bez kodu kleju.

najpierw musielibyśmy filtrować tweety, które wydają się istotne, takie jak” trzęsienie ziemi „lub”drżenie”. Możemy łatwo użyć Spark Streaming w tym celu w następujący sposób:

TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))

następnie musielibyśmy przeprowadzić analizę semantyczną tweetów, aby ustalić, czy wydają się odnosić do aktualnego wystąpienia trzęsienia ziemi. Tweety jak ” trzęsienie ziemi!”lub” teraz się trzęsie”, na przykład, byłoby uważane za pozytywne mecze, podczas gdy tweety takie jak” udział w konferencji trzęsienia ziemi „lub” trzęsienie ziemi wczoraj było straszne ” nie. Autorzy pracy wykorzystali do tego celu maszynę wektorów nośnych (SVM). Zrobimy to samo tutaj, ale możemy również wypróbować wersję strumieniową. Wynikowy przykład kodu z MLlib wyglądałby następująco:

// We would prepare some earthquake tweet data and load it in LIBSVM format.val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt")// Split data into training (60%) and test (40%).val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)val training = splits(0).cache()val test = splits(1)// Run training algorithm to build the modelval numIterations = 100val model = SVMWithSGD.train(training, numIterations)// Clear the default threshold.model.clearThreshold()// Compute raw scores on the test set. val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label)}// Get evaluation metrics.val metrics = new BinaryClassificationMetrics(scoreAndLabels)val auROC = metrics.areaUnderROC()println("Area under ROC = " + auROC)

Jeśli jesteśmy zadowoleni z szybkości przewidywania modelu, możemy przejść do następnego etapu i zareagować za każdym razem, gdy odkryjemy trzęsienie ziemi. Aby je wykryć, potrzebujemy określonej liczby (np. gęstości) pozytywnych tweetów w określonym oknie czasowym (jak opisano w artykule). Zauważ, że w przypadku tweetów z włączonymi usługami lokalizacji Twittera wyodrębnilibyśmy również lokalizację trzęsienia ziemi. Uzbrojeni w tę wiedzę, możemy użyć SparkSQL i odpytywać istniejącą tabelę Hive (przechowującą użytkowników zainteresowanych otrzymywaniem powiadomień o trzęsieniu ziemi), aby pobrać ich adresy e-mail i wysłać im spersonalizowaną wiadomość ostrzegawczą, w następujący sposób:

// sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)// sendEmail is a custom functionsqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)

inne przypadki użycia Apache Spark

potencjalne przypadki użycia Spark wykraczają daleko poza wykrywanie trzęsień ziemi oczywiście.

oto szybki (ale na pewno nie wyczerpujący!) próbkowanie innych przypadków użycia, które wymagają radzenia sobie z prędkością, różnorodnością i objętością Big Data, dla których Spark jest tak dobrze dopasowany:

w branży gier przetwarzanie i odkrywanie wzorców z potencjalnego ogniska wydarzeń w czasie rzeczywistym w grze i możliwość natychmiastowego reagowania na nie jest zdolnością, która może przynieść lukratywny biznes, do celów takich jak zatrzymanie gracza, ukierunkowana Reklama, automatyczna regulacja poziomu złożoności i tak dalej.

w branży e-commerce informacje o transakcjach w czasie rzeczywistym mogą być przekazywane do algorytmu grupowania strumieniowego, takiego jak k-means lub filtrowania opartego na współpracy, takiego jak ALS. Wyniki mogą być następnie łączone z innymi nieustrukturyzowanymi źródłami danych, takimi jak komentarze klientów lub recenzje produktów, i wykorzystywane do ciągłego ulepszania i dostosowywania rekomendacji do nowych trendów.

w branży finansowej lub bezpieczeństwa stos Spark może być stosowany do systemu wykrywania oszustw lub włamań lub uwierzytelniania opartego na ryzyku. Może osiągnąć najlepsze wyniki, zbierając ogromne ilości zarchiwizowanych dzienników, łącząc je z zewnętrznymi źródłami danych, takimi jak informacje o naruszeniach danych i naruszonych kontach (patrz na przykład https://haveibeenpwned.com/) I informacje z połączenia/żądania, takie jak geolokalizacja IP lub czas.

wnioski

Podsumowując, Spark pomaga uprościć trudne i intensywne obliczeniowo zadanie przetwarzania dużych ilości danych w czasie rzeczywistym lub zarchiwizowanych, zarówno ustrukturyzowanych, jak i nieustrukturyzowanych, płynnie integrując odpowiednie złożone możliwości, takie jak uczenie maszynowe i algorytmy Wykresów. Spark zapewnia masowe przetwarzanie dużych zbiorów danych. Zobacz!