Warning: include(/volume1/web/cyberhost.biz/wp-content/plugins/jaster_cahce/cache/top-cache.php): failed to open stream: No such file or directory in /volume1/web/cyberhost.biz/index.php on line 9 Call Stack: 0.0000 356272 1. {main}() /volume1/web/cyberhost.biz/index.php:0 Warning: include(): Failed opening '/volume1/web/cyberhost.biz/wp-content/plugins/jaster_cahce/cache/top-cache.php' for inclusion (include_path='.:/usr/share/pear') in /volume1/web/cyberhost.biz/index.php on line 9 Call Stack: 0.0000 356272 1. {main}() /volume1/web/cyberhost.biz/index.php:0 Flume — управляем потоками данных. Часть 3 | Хостинг за 90 р. от cyberhost.biz — платный хостинг
+7 993 930-19-90 suport@cyberhost.biz

Привет, Хабр! После долгой паузы мы наконец-то возвращаемся к разбору Apache Flume. В предыдущих статьях мы познакомились с Flume (Часть 1) и разобрались, как настраивать основные его компоненты (Часть 2). В этой, заключительной, части цикла мы рассмотрим следующие вопросы:

  • Как настроить мониторинг компонентов узла.
  • Как написать собственную реализацию компонента Flume.
  • Проектирование полноценной транспортной сети.


Мониторинг состояния компонентов узла
Итак, мы настроили и запустили все узлы, проверили их работоспособность — данные успешно доставляются до пункта назначения. Но проходит какое-то время, мы смотрим на результат работы нашей транспортной сети (например, папку с файлами, в которые упаковываются данные) и понимаем, что возникла проблема — начиная с какого-то момента новые файлы не появляются в нашей папке. Следующий шаг кажется очевидным — открываем логи, ищем причину. Беда только в том, что узлов в нашей транспортной сети может быть много, а значит необходимо вручную просматривать логи всех узлов, что, мягко говоря, не очень удобно. Когда подобные проблемы возникают, реагировать на них хотелось бы максимально оперативно, а еще лучше — вообще не допускать таких критичных ситуаций.

Компоненты Flume в процессе работы пишут метрики, которые позволяют оценить состояние узла. По значениям этих метрик довольно легко определить, что с узлом не всё в порядке.

Для хранения счетчиков и других атрибутов своих компонентов Flume использует java.lang.management.ManagementFactory, регистрируя собственные bean-классы для ведения метрик. Все эти классы унаследованы от MonitoredCounterGroup (для любопытных — ссылка на исходный код).

Если вы не планируете разрабатывать собственные компоненты Flume, то закапываться в механизм ведения метрик совершенно необязательно, достаточно разобраться, как их достать. Сделать это можно довольно просто с помощью утилитарного класса JMXPollUtil:

package ru.test.flume.monitoring;
import java.util.Map;
import org.apache.flume.instrumentation.util.JMXPollUtil;
public class FlumeMetrics {
public static Map<String, Map<String, String>> getMetrics() {
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
return metricsMap;
}
}
В результате вы получите метрики, сгруппированные по компонентам узла, которые выглядят примерно так:

Метрики Flume-компонентов (JSON){
"SOURCE.my-source": {
"EventReceivedCount": "567393607",
"AppendBatchAcceptedCount": "5689696",
"Type": "SOURCE",
"EventAcceptedCount": "567393607",
"AppendReceivedCount": "0",
"StartTime": "1467797931288",
"AppendAcceptedCount": "0",
"OpenConnectionCount": "1",
"AppendBatchReceivedCount": "5689696",
"StopTime": "0"
},
"CHANNEL.my-channel": {
"ChannelCapacity": "100000000",
"ChannelFillPercentage": "5.0E-4",
"Type": "CHANNEL",
"ChannelSize": "500",
"EventTakeSuccessCount": "567393374",
"StartTime": "1467797930967",
"EventTakeAttemptCount": "569291443",
"EventPutSuccessCount": "567393607",
"EventPutAttemptCount": "567393607",
"StopTime": "0"
},
"SINK.my-sink": {
"ConnectionCreatedCount": "1",
"ConnectionClosedCount": "0",
"Type": "SINK",
"BatchCompleteCount": "2",
"EventDrainAttemptCount": "567393374",
"BatchEmptyCount": "959650",
"StartTime": "1467797930968",
"EventDrainSuccessCount": "567393374",
"BatchUnderflowCount": "938419",
"StopTime": "0",
"ConnectionFailedCount": "0"
}
}

Метрики получили, теперь необходимо их куда-то отправить. Здесь можно пойти двумя путями.

  • Использовать возможности Flume для предоставления метрик.
  • Написать свою реализацию обработки метрик.
  • Flume предоставляет API, позволяющей задать способ мониторинга — для этого используются реализации интерфейса MonitorService. Для того, чтобы подключить мониторинг, необходимо указать класс, реализующий MonitorService, в качестве системного свойства при запуске узла (или в коде).
    java -Dflume.monitoring.type=org.apache.flume.instrumentation.http.HTTPMetricsServer …
    System.setProperty("flume.monitoring.type", "org.apache.flume.instrumentation.http.HTTPMetricsServer");
    Класс HTTPMetricsServer предлагает стандартный способ отслеживания состояния узла. Он представляет собой небольшой web-сервер, который по запросу отдает полный список метрик узла в виде JSON (как в примере выше). Чтобы указать порт, на котором этот сервер будет слушать запросы, достаточно добавить в конфигурацию Flume параметр (по умолчанию использует порт 41414):

    flume.monitoring.port = 61509
    Запрос к этому серверу выглядит так: localhost:61509/metrics.

    Если же такого способа следить за метриками недостаточно, то придется пойти вторым путём и написать собственную реализацию MonitorService. Именно так мы и поступили, чтобы наблюдать за состоянием наших узлов с помощью Graphite. Ниже приведен простой пример такой реализации.

    FlumeGraphiteMonitor
    package ru.dmp.flume.monitoring;
    import com.google.common.base.CaseFormat;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    import org.apache.flume.Context;
    import org.apache.flume.instrumentation.MonitorService;
    import org.apache.flume.instrumentation.util.JMXPollUtil;
    public class FlumeGraphiteMonitor implements MonitorService {
    // нормализованные имена метрик, которые не нужно отправлять в Graphite
    private static final Set<String> EXCLUDED_METRICS = new HashSet<String>() {{
    add("start-time");
    add("stop-time");
    }};
    private volatile long period = 60 * 1000; // интервал отправки, 1 минута
    private volatile boolean switcher = true;
    private Thread scheduler = new Thread(this::schedule);
    @Override
    public void configure(Context context) {
    // Здесь можно достать какие-нибудь настройки из файла конфигурации
    }
    private void schedule() {
    while (switcher) {
    send();
    synchronized (this) {
    try {
    wait(period);
    } catch (InterruptedException ex) {}
    }
    }
    }
    @Override
    public void start() {
    scheduler.start();
    }
    @Override
    public void stop() {
    switcher = false;
    synchronized (this) {
    notifyAll();
    }
    try {
    scheduler.join();
    } catch (InterruptedException ex) {}
    }
    private void send() {
    Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
    for (Map.Entry<String, Map<String, String>> e: metricsMap.entrySet()) {
    if (e.getValue() != null) {
    // все метрики от узлов Flume начинаем с префикса "flume"
    String group = "flume." + normalize(e.getKey().toLowerCase()) + ".";
    for (Map.Entry<String, String> metric : e.getValue().entrySet()) {
    try {
    Double value = Double.valueOf(metric.getValue());
    String metricName = normalize(metric.getKey());
    if (!EXCLUDED_METRICS.contains(metricName)) {
    String fullName = group + normalize(metric.getKey());
    // Отправляем данные в графит или куда-то еще
    // Graphite.send(metricName, value);
    }
    } catch (NumberFormatException ex) {
    // так отсеиваем значения, не являющиеся числом
    }
    }
    }
    }
    }
    // приводим к виду EventReceivedCount -> event-received-count (необязательно)
    private static String normalize(String str) {
    return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str).replaceAll("_", "-");
    }

    В итоге получаем аккуратную ветку Graphite со всеми метриками узла.


    Ниже приведены описания графиков и метрик, которые мы используем для одного из наших сервисов.

  • Интенсивность отправки сервисом сообщений на узел Flume. График строится не по метрикам узла — эти значения в Graphite отправляют сервисы, которые генерируют данные и являются отправной точкой нашей транспортной системы. Если ваши источники данных не позволяют отслеживать отправку данных во Flume, то похожие графики можно снять с источника(-ов) узла.

    Если значение на этом графике падает до нуля, значит клиент по каким-то причинам не может отправить сообщения во Flume. Чтобы диагностировать, кто виноват в таких ситуациях, мы отдельно отображаем график ошибок, возникающих на стороне клиента. Соответственно, если он отличен от нуля — проблема на узле Flume, источник не может принять данные. Если же падение интенсивности не влечет роста числа ошибок — значит проблема на стороне сервиса, он перестал отправлять сообщения.

  • Заполненность каналов узла. С этим графиком всё просто — он всегда должен быть очень близок к нулевому значению. Если канал не успевает опустошаться, значит где-то в нашей транспортной сети возникло узкое место и необходимо искать узлы, которые не успевают справляться с нагрузкой. Метрика на графике:
    flume.channel.{CHANNEL-NAME}.channel-fill-percentage

  • Интенсивность работы стоков узла. Ожидаемые показатели стоков на этом узле — «сколько получили, столько и отправили», поскольку события от сервисов не дублируются в каналы. Таким образом, интенсивность опустошения стоков должна быть такой же, как и интенсивность отправки данных клиентами. Метрика на графике:
    flume.sink.{SINK-NAME}.event-drain-success-count

    Падение интенсивности любого из стоков до нуля говорит о потенциальной проблеме на следующем, принимающем, узле. Как следствие, канал, опустошаемый «поломанным» стоком, начнет заполняться. Также возможна ситуация, при которой принимающие узлы работают нормально, но просто не успевают обрабатывать входные данные — в этом случае графики стоков будут ненулевыми, но каналы будут постепенно заполняться.

  • Создание собственных компонентов Flume
    Несмотря на то, что набор стандартных компонентов Flume довольно обширен, довольно часто возникают ситуации, разрешить которые этим стандартными компонентами невозможно. В этом случае можно написать свой компонент Flume и задействовать его в узлах. Свою реализацию можно написать для любого из компонентов Flume — sink, source, channel, interceptor и т.п.

    Первое, что бросилось в глаза при изучении стоков Flume — это отсутствие гибкого стока для файловой системы. Да, есть File-Roll Sink, возможности которого описывались во 2й части цикла. Но этот сток полностью лишен возможности как-либо влиять на имя файла, что не очень удобно.

    Мы решили разработать свой сток, позволяющий формировать файлы в локальной файловой системе. При разработке руководствовались следующими соображениями.

    • У нас довольно много сервисов со сравнительно небольшой нагрузкой. Это значит, что в итоге у нас будет довольно много разнородных файлов — не хотелось бы под каждый из них настраивать отдельный сток.
    • Файлы должны ротироваться по времени. Причем, для различных данных период ротации может отличаться (под ротацией имеется ввиду «нарезка данных» на файлы по времени — 15 минут, час и т.п.).
    • Данные от каждого сервиса должны складироваться в отдельную папку. Причем, один сервис может генерировать данные для нескольких подпапок.

    Исходя из этих тезисов, мы пришли к выводу, что задачу формирования имени файла лучше оставить клиенту (т.е. сервисам, генерирующим данные), иначе конфигурация стока будет слишком громоздкой, а для каждого нового «клиента» придется добавлять отдельный сток с индивидуальными настройками.

    Примечание. Здесь уместно сравнение с HDFS-стоком, о котором мы говорили в предыдущей статье. Этот сток позволяет выполнить очень тонкую настройку ротации и именования файлов. Но эта гибкость настройки имеет и обратную сторону — например, для файлов, ротирующихся раз в 15 и раз в 30 минут приходится делать различные стоки, даже если во всем остальном параметры идентичны.

    Итого, решение по функциональности файлового стока было принято следующее:

    • Имя файла, в который должны быть записаны данные, определяет клиент и передает его в заголовке вместе с событием.
    • В имени файла могут быть указаны подпапки.
    • Файлы, в которые ведется запись стоком, закрываются по некоторому таймауту, когда для них перестают приходить события.

    Схематично процесс обработки данных этим стоком выглядит так:

    Что это дало в итоге:

    • Не требуется добавлять сток для каждого нового сервиса или типа данных.
    • Сток лишен издержек на формирование имени файла (в предыдущей части мы рассматривали эти издержки на примере HDFS-стока)
    • Поскольку имя файла однозначно идентифицируется одним заголовком, мы можем пользоваться группировкой событий на стороне клиента (этот прием также описан во второй части цикла).

    Исходный код файлового стока и пример его конфигурации выложены на GitHub. Подробно разбирать процесс разработки этого стока, я думаю, смысла нет, ограничусь несколькими тезисами:

    • За основу компонента берется либо абстрактный класс, либо интерфейс компонента (в зависимости от того, что вы разрабатываете — сток, перехватчик или что-то еще).
    • Делаем собственную реализацию — проще всего взять что-нибудь из готовых Flume-компонентов в качестве примера, благо все исходники доступны.
    • При конфигурации указываем не зарезервированный псевдоним компонента (типа ‘avro’ или ‘logger’), а имя класса целиком.

    Проектируем транспортную сеть
    Общие приемы управления данными мы рассмотрели в предыдущих частях цикла — события можно делить между узлами, дублировать, выбирать «направление движения», используя заголовки и т.п. Попробуем теперь использовать все эти приемы для того, чтобы построить надежную транспортную сеть. Предположим, задача выглядит следующим образом.

  • Поставщик данных — некий сервис, работающий на нескольких машинах (имеет несколько одинаковых инстансов).
  • Данные, генерируемые сервисом, разнородны — часть из них нужно доставить в HDFS, часть — в файловую систему на некий лог-сервер.
  • Необходимо в режиме реального времени вести некоторые неатомарные вычисления, связанные с данными.
  • На условии 3 остановлюсь подробнее. Предположим, что задача состоит в подсчете уникальных пользователей сайта за последний час. В этом случае мы не можем позволить себе распараллелить поток данных с машин или вычислять это значение отдельно на каждом веб-сервисе — вести подсчет уникальных пользователей по их кукам необходимо на едином потоке данных со всех машин. В противном случае каждый инстанс будет иметь свой набор уникальных пользователей, которые нельзя «взять и сложить» для получения конечного результата.

    Примечание. Разумеется, пример немного надуман — эту задачу можно решить и другими, более эффективными способами. Суть примера сводится к ситуации, когда вам необходимо пропускать некоторый поток данных централизовано через один обработчик и разделить нагрузку невозможно из-за особенностей задачи.

    Итак, для начала подготовим клиентские и конечные узлы:

    Для каждого из веб-сервисов ставим свой, индивидуальный узел — на той же машине, что и веб-сервис. Делается это по следующим причинам:

    • Эти узлы играют роль буфера — если по какой-то причине доставка событий на другие машины станет невозможна, эти узлы позволят некоторое время «продержаться» без потерь данных за счет толстого файлового канала.
    • Уменьшается время отклика. Разумеется, отправка данных во Flume должна выполняться асинхронно — но при пиковых нагрузках или забитой сети может возникнуть ситуация, когда фоновый поток не будет успевать отправлять новые события. В этом случае очередь на отправку может сильно вырасти, безжалостно поглощая память сервиса, что не очень хорошо. В случае, когда узел расположен на той же машине, что и сервис, эти издержки значительно снижаются.
    • Если дальнейшая логика обработки данных изменится и вы решите перестроить транспортную сеть, то изменения нужно будет внести только в конфигурацию клиентского узла, а не веб-сервиса. Для него все останется по-прежнему — «я отправляю данные на свой узел, дальше пусть сам решает, как быть».

    Остается вопрос — как доставить данные так, чтобы ничего не потерять, если что-то сломается? Ряд мер мы уже предприняли — данные для HDFS и для FS пишутся на несколько машин. При этом данные не дублируются, а делятся. Таким образом, если одна из конечных машин выходит из строя, вся нагрузка пойдет на оставшуюся в живых. Следствием такой поломки будет дизбаланс по записанному объему данных на различных машинах, но с этим вполне можно жить.

    Чтобы обеспечить бОльшую стабильность, добавим несколько промежуточных узлов Flume, которые будут заниматься непосредственно распределением данных:

    Получилась довольно жуткая паутина. Что здесь происходит:

  • Веб-сервис отправляет события на клиентский узел.
  • Каждое событие имеет заголовок, который указывает «пункт назначения» (например dist=FS или dist=HDFS), а также заголовок uniq с возможными значениями 1 или 0.
  • Каждый клиентский узел имеет по 3 стока, которые равноправно опустошают канал и равномерно распределяют события между тремя промежуточными узлами — Splitter’ами (пока что без оглядки на заголовок dist).
  • Каждый Splitter имеет несколько каналов — для HDFS, FS и счетчика уникальных пользователей. Необходимый канал выбирается по заголовкам dist и uniq.
  • Каждый из этих каналов на Splitter’e имеет несколько стоков, которые равномерно распределяют события между конечными машинами (FS, HDFS или UNQ).
  • Если с клиентскими узлами всё относительно просто — они просто делят события между Splitter’ами, то структуру отдельно взятого Splitter’a стоит рассмотреть более детально.


    Здесь видно, что конечный пункт для данных определяется с помощью заголовка dist. При этом, события, по которым считаются уникальные пользователи, не зависят от заголовка dist — они ориентируются на заголовок uniq. Это значит, что некоторые события могут быть продублированы в несколько каналов, например HDFS и UNQ.

    Ранее я специально не указал направления от Splitter’ов к узлам UNQ. Дело в том, что эти узлы не принимают распределенные данные, как HDFS или FS. Учитывая специфику задачи подсчета уникальных пользователей, весь поток данных должен проходить только через одну машину. Закономерный вопрос — зачем нам тогда 2 узла для подсчета уникальных пользователей? Ответ — потому что если один узел сломается, его будет некому заменить. Как здесь быть — делить события между узлами мы не можем, оставить один — тоже нельзя?

    Здесь нам может помочь еще один инструмент Flume, позволяющий работать стокам в группе по принципу «Если сток 1 сломался, используй сток 2». Этот компонент называется Failover Sink Processor. Его конфигурация выглядит следующим образом:

    # Сами по себе стоки описываются как обычно
    agent.sinks.sink-unq-1.type = avro
    agent.sinks.sink-unq-1.batch-size = 5000
    agent.sinks.sink-unq-1.channel = memchannel
    agent.sinks.sink-unq-1.hostname = unq-counter-1.my-company.com
    agent.sinks.sink-unq-1.port = 50001
    agent.sinks.sink-unq-2.type = avro
    agent.sinks.sink-unq-2.batch-size = 5000
    agent.sinks.sink-unq-2.channel = memchannel
    agent.sinks.sink-unq-2.hostname = unq-counter-2.my-company.com
    agent.sinks.sink-unq-2.port = 50001
    # Объединяем их в группу
    agent.sinkgroups = failover-group
    agent.sinkgroups.failover-group.sinks = sink-unq-1 sink-unq-2
    # Тип процессинга указываем как failover
    agent.sinkgroups.failover-group.processor.type = failover
    # Приоритеты для стоков — сток с высоким значением будет задействован первым
    agent.sinkgroups.failover-group.processor.priority.sink-unq-1 = 10
    agent.sinkgroups.failover-group.processor.priority.sink-unq-2 = 1
    # Как часто проверять — вернулся ли сток в строй (мс)
    agent.sinkgroups.failover-group.processor.maxpenalty = 10000
    Приведенная выше настройка группы потоков позволяет использовать только один сток, но при этом иметь «запасной» на случай аварии. Т.е. покуда сток с высоким приоритетом исправно работает, стоки с низким приоритетом будут простаивать.

    Таким образом, поставленная задача выполнена — данные распределяются между HDFS и FS, счетчики уникальных пользователей работают корректно. При этом выход из строя любой машины не приведет к потере данных:

  • Если ломается машина c Web-сервисом, то эта проблема решается балансером.
  • Если из строя вышел один из Splitter’ов, нагрузка будет распределена между остальными.
  • Конечные узлы также задублированы, поломка одного из них не приведет к застою или потере данных.
  • Узел для подсчета уникальных пользователей имеет «дублёра» и в случае поломки будет заменен им без нарушения логики обработки данных.
  • Для такой схемы задачи масштабирования сводятся к простому изменению конфигурации узлов Flume для соответствующего уровня узлов (Client, Splitter или Final):

  • Новый инстанс Web-сервиса — не требуется изменения конфигурации, он просто устанавливается вместе с клиентским узлом Flume.
  • Новый Splitter — требуется изменить только конфигурацию клиентских узлов, добавив новый сток.
  • Новый конечный узел — требуется изменить только конфигурацию Splitter’a, добавив новый сток.
  • Заключение
    На этом мы завершаем цикл статей про Apache Flume. Мы осветили все самые ходовые его компоненты, разобрались как управлять потоком данных и рассмотрели пример полноценной транспортной сети. Тем не менее, возможности Flume не исчерпываются всем этим — в стандартном пакете есть еще довольно много не рассмотренных нами компонентов, которые могут значительно облегчить жизнь при решении определенных задач. Надеемся, что этот цикл статей помог вам познакомиться с Flume и получить достаточно полное представление о нём.

    Спасибо за внимание!

    Warning: include(/volume1/web/cyberhost.biz/wp-content/plugins/jaster_cahce/cache/bottom-cache.php): failed to open stream: No such file or directory in /volume1/web/cyberhost.biz/index.php on line 13 Call Stack: 0.0000 356272 1. {main}() /volume1/web/cyberhost.biz/index.php:0 Warning: include(): Failed opening '/volume1/web/cyberhost.biz/wp-content/plugins/jaster_cahce/cache/bottom-cache.php' for inclusion (include_path='.:/usr/share/pear') in /volume1/web/cyberhost.biz/index.php on line 13 Call Stack: 0.0000 356272 1. {main}() /volume1/web/cyberhost.biz/index.php:0