Про Hadoop мне рассказали в офисе Netflix в 2011 году. Я сразу стал искать и читать документацию по этому сервису, смотрел на YouTube конференции о том, как с ним работать. В качестве эксперимента я развернул Hadoop на своем рабочем ноутбуке, выбрав в качестве дистрибутива версию от Cloudera. Удобство Hadoop в том, что его можно поставить хоть на ноутбуке – все сервисы будут крутиться на нем. По мере увеличения объема данных можно легко добавлять сервера, причем даже используя самые дешевые. Именно так я и поступил, когда начал писать рекомендательный движок Retail Rocket. Начал я с одного или двух серверов, пять лет спустя кластер Hadoop вырос до 50 машин и содержит порядка двух петабайт сжатых данных.
В начале пути я пользовался двумя языками программирования для Hadoop – Pig и Hive. На них была написана первая версия рекомендательного движка, затем мы перешли на Spark и стали писать на Scala.
Не пренебрегайте принципами MapReduce. Однажды мне они пригодились, когда я участвовал в одном из соревнований Kaggle. Датасет был очень большой, он не помещался полностью в память, пришлось писать предварительную обработку на чистом Python, используя подход MapReduce. Тогда у меня это заняло много времени, сейчас я бы поставил локально фреймворк Spark и не изобретал бы велосипед. Это сработает, ведь MapReduce-операции можно выполнять как параллельно на разных машинах, так и последовательно. Сами вычисления займут много времени, но они хотя бы посчитаются, и у вас не будет болеть голова, хватит ли памяти для расчетов.
Spark
С фреймворком Spark я познакомился в 2012 году, когда приобрел для корпоративной библиотеки Ostrovok.ru видеозаписи конференции по анализу данных Strata. Эту конференцию организует издательство O’Reilly в США. На одной из лекций я увидел, как Матей Захария (основной автор Spark) рассказывает о преимуществах Spark над чистой реализацией MapReduce на Hadoop. Самое главное преимущество в том, что Spark загружает данные в память, в так называемые отказоустойчивые распределенные датасеты RDD (resilient distributed dataset), и позволяет работать с ними в памяти итеративно. Чистый Hadoop же полагается на дисковую память – для каждой пары операций MapReduce-данные читаются с диска, затем сохраняются. Если алгоритм требует применения еще нескольких операций, то для каждой из них придется читать данные с диска и сохранять обратно. Spark же, совершив первую операцию, сохраняет данные в памяти, и последующие операции MapReduce будут работать с этим массивом, пока программа не прикажет явно сохранить их на диск. Это очень важно для задач машинного обучения, где используются итеративные алгоритмы поиска оптимального решения. Все это дало огромный прирост производительности, иногда в 100 раз быстрее классического Hadoop.
Эти идеи из лекции Матея Захарии мне пригодились через несколько лет, когда мы стали писать вторую версию рекомендательного движка Retail Rocket. Тогда как раз вышла версия 1.0.0. Летом 2014 года мы попробовали запустить Spark поверх Hadoop – и все получилось. В экспериментах мы достигли 3–4-кратного ускорения. Была, правда, проблема с производительностью на большом количестве мелких файлов. Мы написали небольшую библиотеку, которая склеивала их при загрузке, и проблема решилась [41]. Сейчас мы по-прежнему используем Spark на нашем Hadoop-кластере и ни о чем не жалеем.
При переходе с чистого Hadoop на «Spark поверх Hadoop» пришлось отказаться от большого количества кода на языке Pig, Hive, Java, Python – весь этот зоопарк доставлял большую головную боль, ведь их все должны были знать. Для прототипирования задач машинного обучения мы использовали связку питоновских инструментов: IPython + Pyhs2 (hive-драйвер Python) + Pandas + Sklearn. Со Spark мы смогли начать пользоваться только одним языком программирования для прототипирования экспериментальных функций и для рабочего варианта. Это было очень большое достижение для нашей небольшой команды.
Spark поддерживает четыре языка программирования из коробки Python/Scala/Java/R через API. Сам Spark написан на Scala, поэтому мы выбрали его. В таком случае мы сможем свободно читать исходный код Spark и даже исправлять ошибки в нем (это нам не пригодилось). Кроме того, Scala принадлежит к семейству JVM-языков, что очень удобно при работе с файловой системой Hadoop напрямую через API, так как он написан на Java.
Ниже приведено сравнение языков программирования для Spark, где + и – означают плюсы и минусы языка соответственно.
Scala:
+ функциональный, поэтому очень удобный для обработки данных любых объемов;
+ родной для Spark, это важно, если нужно понимать работу Spark изнутри;
+ основан на JVM, что делает его совместимым с Hadoop;
+ строгая типизация, тогда компилятор поможет найти часть ваших ошибок;
– труден в освоении, нам пришлось разработать свою программу обучения для новичков [19];
– сложный наем – разработчики на Scala дороже, чем на Java или Python;
– сам язык не настолько распространен, как Java или Python.
Python:
+ популярный;
+ простой;
– динамическая типизация, ошибки, которые мог обнаружить компилятор;
– производительность хуже, чем Scala;
– нет чистой функциональности, как у Scala.
Java:
+ популярный;
+ родной для Hadoop;
+ строгая типизация;
– нефункционален (на самом деле после Java 8 все стало намного лучше).
Когда писалась вторая версия рекомендательного движка Retail Rocket, решение выбрать Scala основным языком программирования далось непросто, потому что этот язык никто не знал. Если бы мне пришлось выбирать сейчас, возможно, я бы посмотрел в сторону Java 8 и выше (версий), поскольку Java-разработчиков проще найти, чем тех, кто знает Scala.
Сейчас Spark идет в сторону дата-фреймов и дата-сетов (dataframe dataset), моду на которые сделала библиотека pandas [42] для Python – самая популярная для анализа данных. На них проще писать, но есть один нюанс. Компилятор не может проверить корректность работы с внутренними переменными, что не очень хорошо для больших проектов.
Оптимизация скорости работы
Если пользователя не устраивает скорость ответа аналитической системы, значит, пора оптимизировать скорость работы хранилища. Решение может быть как простым, так и сложным. Первое, что необходимо понять, – можно ли обойтись малой кровью. В реляционных базах данных можно добавить индексы. В базах данных для этого имеются так называемые профайлеры, которые на основе запросов пользователей предложат, какие именно индексы добавить. В колоночной базе данных Clickhouse можно сменить схему партицирования или сделать сэмплирование, когда запрос будет использовать только часть данных. В Hadoop можно выделить больше ресурсов или даже оптимизировать программный код. Я практиковал оба способа.