MapReduce
| Описание | MapReduce — модель распределённых вычислений, представленная компанией Google, используемая для параллельных вычислений над очень большими, вплоть до нескольких петабайт, наборами данных в компьютерных кластерах. |
|---|---|
| Область знаний | Большие данные |
| Авторы | |
| Поясняющее видео | |
| Близкие понятия | |
| Среды и средства для освоения понятия | Hadoop |
MapReduce — это фреймворк для вычисления некоторых наборов распределенных задач с использованием большого количества компьютеров (называемых «нодами»), образующих кластер.
Работа MapReduce состоит из двух шагов: Map и Reduce, названных так по аналогии с одноименными функциями высшего порядка, map и reduce.
Map
На Map-шаге происходит предварительная обработка входных данных. Для этого один из компьютеров (называемый главным узлом — master node) получает входные данные задачи, разделяет их на части и передает другим компьютерам (рабочим узлам — worker node) для предварительной обработки.
Стадия Map зачастую представляет собой обычное чтение с жёсткого диска. Кроме чтения здесь могут применяться однострочные трансформации и фильтры, т.е. операции без join, group by, order by, distinct и без агрегирующих функций. Данные зачастую сохранены равномерно на каждом сервере. Все серверы участвуют в этой стадии и делят нагрузку равномерно.
Reduce
На Reduce-шаге происходит свёртка предварительно обработанных данных. Главный узел получает ответы от рабочих узлов и на их основе формирует результат — решение задачи, которая изначально формулировалась.
Здесь происходят все группирующие операции, а также операции, которые записывают результат. Некоторые операции не могут выполняться одновременно на нескольких серверах, поэтому для их выполнения требуется собрать весь объем данных на одном сервере.
Пример
Канонический пример приложения, написанного с помощью MapReduce, — это процесс, подсчитывающий, сколько раз различные слова встречаются в наборе документов:
// Функция, используемая рабочими нодами на Map-шаге
// для обработки пар ключ-значение из входного потока
void map(String name, String document):
// Входные данные:
// name - название документа
// document - содержимое документа
for each word in document:
EmitIntermediate(word, "1");
// Функция, используемая рабочими нодами на Reduce-шаге
// для обработки пар ключ-значение, полученных на Map-шаге
void reduce(Iterator partialCounts):
// Входные данные:
// partialCounts - список группированных промежуточных результатов. Количество записей в partialCounts и есть
// требуемое значение
int result = 0;
for each v in partialCounts:
result += parseInt(v);
Emit(AsString(result));
Анализ логов веб-сервера
Анализ логов для подсчёта количества обращений к каждому URL..
// from mrjob.job import MRJob
import re
class MRLogAnalyzer(MRJob):
# Mapper: извлекает URL из строки лога и выдает (url, 1)
def mapper(self, _, line):
# Простое регулярное выражение для извлечения URL (GET /path HTTP/...)
match = re.search(r'\"(?:GET|POST) (.*?) HTTP/', line)
if match:
url = match.group(1)
yield (url, 1)
# Reducer: суммирует количество обращений к каждому URL
def reducer(self, key, values):
yield (key, sum(values))
if __name__ == '__main__':
MRLogAnalyzer.run()
Анализ числовых данных (среднее арифметическое)
Этот пример показывает, как MapReduce используется не только для текста, но и для обработки числовых данных, например, для нахождения среднего значения.
// from mrjob.job import MRJob
class MRAverage(MRJob):
# Mapper: выдает ключ 'avg' и пару (значение, 1)
def mapper(self, _, line):
num = float(line.strip())
yield ('avg', (num, 1))
# Reducer: суммирует все значения и счетчики, затем вычисляет среднее
def reducer(self, key, values):
total_sum = 0
total_count = 0
for value, count in values:
total_sum += value
total_count += count
yield ('average', total_sum / total_count)
if __name__ == '__main__':
MRAverage.run()
