Вступ (Лінивим можна пропустити)
Врамках Big Data тренінгу я почав вивчати роботу з Apach Spark computing framework. В інтернеті є достатня кількість прикладів побудови пайплайнів на різних мовах програмування (наприклад тут), проте я ніде не зміг найти хорошого гайду чи прикладу як написати проект з “нуля” і запустити його виконання на кластері. Усі що я знаходив приклади використовували локальний (тестовий) режим мастера , таким чином ніякої дистрибуції логіки не відбувалося і відчути що таке Apache Spark в повній мірі мені не вдавалося. До дня коли я прослухав тренінг людини, що працювала з цим інструментом в продакшені, саме він вніс ясність в те як Apache Spark працює (дякую тобі Тарас Матяшовський). Отже, після того як я в цьому розібрався я вирішив залишити ці знання на просторах Java User Group для всіх кому цікаво і для самого себе, щоб не забути 😉 Також, надіюся з заголовку статті зрозуміло що тут буде йтись лише про поверхневе ознайомлення з спарком і основна мета це зробити усе правильно. Тут буде використаний Stand Alone Spark Master який не потрібно використувувати в продакшені, для цого ви може використати Hadoop Yarn.
Опис задачі
Задача взята з документації ApacheSpark – підрахунок слів у текстовому файлі. Проте зробимо це по законам жанру:
1. Дистрибутивна бібліотека (саме та яка розлетиться по воркерах кластеру і буде робити усю роботу);
2. Spark Driver (іншими словами це і буде наш клієнт через який ми доступатимемось до мастера і де ми будемо будувати наші пайплайни);
3. Web application (Наша веб аплікація – ендпоінти які ми будемо викликати щоб виконати ту чи іншу роботу, ну і щоб Spark Application UI був доступний і ми змогли подивитись статистику виконання роботи).
Надіюся усе зрозуміло , проте якщо й ні , то далі постараюся усе розписати як найкраще. Поїхали !
Створення архітектури проектів
Для збірки проектів я буду використовувати Apache Maven, і почнемо з того що створемо батьківський проект який назвемо “spark-for-newbie” і додамо туди pom.xml файл. Батьківський pom.xml файл повинен включати усі вище згадані модулі :
<modules>
<module>distributed-library</module>
<module>web-api</module>
<module>spark-driver</module>
</modules>
Тепер підключаємо до нього основні залежності :
– Залежності на наші бібліотечні модулі :
<dependency>
<groupId>org.ar.spark.newbie</groupId>
<artifactId>spark-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.ar.spark.newbie</groupId>
<artifactId>distributed-library</artifactId>
<version>${project.version}</version>
</dependency>
– Також залежність на останню (на момент публікації) версію Apache Spark :
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
Тепер створюємо наші модулі в середині батьківського проекту:
distributed-library
Додаємо до цього проекту лише залежність на Apache Spark , так як ця бібліотека міститиме класи функцій з його пакету, що будуть виконуватись на різних воркерах нашого кластеру.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<scope>provided</scope>
</dependency>
spark-driver
Цей проект також буде бібліотекою яку ми використаємо у наступному web-api модулі. Він міститиме Spark Conext та залежність на функції, що будуть виконувати логіку нашого обчислення, тому нам потрібні наступні залежності :
<dependency>
<groupId>org.ar.spark.newbie</groupId>
<artifactId>distributed-library</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
</dependency>
web-api
Даний модуль буде нашою основною аплікацією, він повинен містити залежність на наш Spark Driver а також на дуже цікаву бібліотеку “Java Spark”, завдяки якій ми зробимо REST Endpoint всього в одну лінійку коду. Spark Java, немає нічого спілького з Big Data та Apache Spark, також вона заслуговує окремої статті щоб описати її можливості.
<dependency>
<groupId>org.ar.spark.newbie</groupId>
<artifactId>spark-driver</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.3</version>
</dependency>
Розробка дистрибутивної бібліотеки (distributed-library)
Для того, щоб підрахувати кількість слів у текстовому файлі ми повинні зробити наступний пайплайн :
Отже нам треба буде написати 4 функції, 3 map і 1 reduce а також клас в який ми будемо мапити key-value рузультати і повернемо результати з воркера назад в аплікацію.
SeparateWordLinesFunction.java – її задача отримати на вхід об’єкт типу String котрий є вичитаною лінією тексту з текстового файлу і розбити на окремі слова , тобто повернути список об’єктів String. Для цього, клас функції робитиме наступне :
public class SeparateWordLinesFunction implements FlatMapFunction<String, String> {
@Override
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(” “));
}
};
MapWordsToKeyValueFunction.java – повинна замапити кожне слово до лічіильника, який по замовчуванню буде “1”, таким чином ми отримаємо key-value структуру даних. Для того, щоб тримати таку структуру буде використано об’єкт Tuple2 з пакету Scala бібліотеки Apache Spark :
public class MapWordsToKeyValueFunction implements PairFunction<String, String, Integer> {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
};
ReduceKeyValueWordsByKey.java – reduce операція по ключу нашої структури, тут ми повинні вирішити що будемо робити з значенням і в нашому випадку ми його просумуємо :
public class ReduceKeyValueWordsByKey implements Function2<Integer, Integer, Integer> {
@Override
public Integer call(Integer intVal1, Integer intVal2) throws Exception {
return intVal1 + intVal2;
}
}
MapKeyValueWordsToWrapperObject.java – функція що перетворить key-value структуру в більш лояльний для подальшої роботи об’єкт :
public class MapKeyValueWordsToWrapperObject implements Function<Tuple2<String,Integer>, WordResult> {
@Override
public WordResult call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return new WordResult(stringIntegerTuple2._1, stringIntegerTuple2._2);
}
}
WordsResult.java – звичайний POJO об’єкт котрий містить поля для самого слова і його лічильника. Потрібно лише імплементувати інтерфейс Serializable (для надання можливості серіалізувати його при передачі по мережі) і особисто від себе я перевизначив метод toString так як буду виводи об’єкти напряму до користувача.
Розробка Spark Driver бібліотеки (spark-driver)
Тут ми створемо SparkContext через який отримаємо доступ до мастера і задекларуємо наш пайплайн.
Створення SparkContext :
Для цього необідно створити SparkConf – конфігураційних об’єкт якому ми передамо назву нашої аплікації, шлях до мастера та шлях до скомпільованого JAR файлу який Spark розповсюдить на воркери де будуть виконуватись етапи нашого пайплайну :
private String[] distributedJars = new String[]{“/<path_to_workspace>/distributed-library/target/distributed-library-1.0-SNAPSHOT.jar”};
private JavaSparkContext sparkContext;
public SparkDriver(){
SparkConf sparkConf = new SparkConf().setAppName(“SparkForNewbie”)
.setJars(distributedJars).setMaster(“spark://127.0.0.1:7077”);
this.sparkContext = new JavaSparkContext(sparkConf);
}
Імплементація Pipeline :
public List<WordResult> countWordsFromFile(String filePath){
JavaRDD<String> words = sparkContext.textFile(filePath);
return words
.flatMap(new SeparateWordLinesFunction())
.mapToPair(new MapWordsToKeyValueFunction())
.reduceByKey(new ReduceKeyValueWordsByKey())
.map(new MapKeyValueWordsToWrapperObject())
.collect();
}
З імплементації пайплайну чітко видно нашу попередню схему , де я показував як він виглядатиме. Усі функції підтянуті через залежність і знаходяться в distributed-library.
Розробка Веб аплікації
Цей модуль потрібен нам саме для взаємодії нашого проекту з кінцевим користувачем. Тут ми зробимо енд-поінт який користувач зможе викликати в браузері і запустити на виконання Spark Pipeline, проте найцікавіше в цьому модулі це використання Spark Java, за допомогою якої ми реалізуємо енд-поінт :
get(“/data”, (request, response) -> dataService.countWords())
Угу, саме так, 1 рядок коду і при старті web-api Spark Java розгорне Embedded Jetty і реалізує “/data” енд-поінт. Далі, при виклику нашого енд-поінту викликаємо сервіс який містить в собі інстанс SparkDriver класу і виконує наш пайплайн передаючи шлях до піддослідного файлу:
public class DataService {
private static final String TEXT_FILE_PATH = “/workspace/projects/spark-for-newbie/test.txt”;
private SparkDriver sparkDriver = new SparkDriver();
public List<WordResult> countWords(){
return sparkDriver.countWordsFromFile(TEXT_FILE_PATH);
}
}
Ось і все , усі модулі нашого проекту розписані , і можна переходити до тестування.
Тестування
Запуск Apache Master (stand alone version)
Для цього качаємо дистрибутив Apache Spark (я викачував spark-1.6.0-bin-hadoop2.6.tgz) і йдемо у папку “conf” що в середині.
Тут потрібно змінити розширення файлу “spark-env.sh.template” на “spark-env.sh”, зайти в середину і додати кілька рядків для конфігурації :
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8082
export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_WORKER_WEBUI_PORT=8083
(Опис усіх цих та інших можливих опцій присутні у цьомуж файлі)
Тепер йдемо назад і переходимо у папку “sbin”, тут виконуємо “./start-master.sh ” і переходимо в браузері по шляху “http://localhost:8082/”, тут має бути розгорутий ЮІ нашого мастера :
Далі запускаємо воркер (так, так це не кластер адже все робиться на одній машині, але спарк про це не знає, він має все що необхідно для роботи у розподіленому режимі – мастер а мастер має воркер). Виконуємо “/start-slave.sh 127.0.0.1:7077” і переходимо по шляху “http://localhost:8083/” :
Тепер у нас є все щоб запустити нашу аплікацію, йдемо в мейн метод модуля web-api та запускаємо його. В логах розгортання проекту ви повинні побачити як спарк розгорає ЮІ нашої аплікації :
Переходимо по цьому шляху і повинні побачити :
Щож, тепер усе готово, відкриваємо нову вкладку в браузері і переходимо на наш енд-поінт :
http://localhost:4567/data
Якщо ж все було зроблено правильно ви повинні побачити результат нашого обчислення :
P.S.
Якщо ж у вас щось не вийшло або не спрацювало, ви можете знайти сорси мого проекту тут . Також ділюся з вами сорсами проекту Тараса, котрі я використовував в довідкових цілях коли знайомився з Apache Spark, вдачі !