|
| ||||||||||||
| ||||||||||||
|
2010 г.
Интеграция Hadoop и параллельной СУБДЮ Ксу, Пекка Костамаа, Лайк Гао
3. Выборка данных EDW в программах MapReduceВ этом разделе мы обсуждаем подход TeradataInputFormat, позволяющий программам MapReduce напрямую читать данные Teradata EDW через драйверы JDBC без потребности в каких-либо внешних шагах экспортирования (из Teradata EDW) и загрузки данных в Hadoop. Прямолинейный подход, обеспечивающий программам MapReduce доступ к реляционным данным, состоит в том, что сначала используется утилита СУБД для экспорта результатов требуемых SQL-запросов в локальный файл, а затем этот локальный файл загружается в Hadoop (или результаты запросов используются в потоковом стиле без потребности в промежуточном файле). Однако программисты MapReduce часто считают более удобным и продуктивным прямой доступ к реляционным данным из своих программ MapReduce без привлечения внешних шагов экспортирования данных из СУБД (для чего требуется знание скриптового языка экпорта данных) и их загрузки в Hadoop.Признавая потребность интеграции реляционных данных в программах Hadoop MapReduce, компания-стартап Cloudera [6], которая специализируется на коммерциализации продуктов и сервисов, связанных с Hadoop, обеспечивает несколько Java-классов (в основном, DBInputFormat [7]), входящих теперь в основной дстрибутив Hadoop и позволяющих программам MapReduce посылать SQL-запросы через стандартный интерфейс JDBC для параллельного доступа реляционных данных. Поскольку наш подход TeradataInputFormat инспирирован подходом DBInputFormat (но не основывается на нем), мы сначала кратко опишем, как работает подход DBInputFormat, а затем обсудим подход TeradataInputFormat. 3.1. DBInputFormatОсновная идея состоит в том, что программист MapReduce через класс DBInputFormat представляет SQL-запрос. Последующее выполнение производится реализацией DBInputFormat и является прозрачным для программиста MapReduce. Класс DBInputFormat ассоциирует некоторый модифицированный SQL-запрос с каждым Mapper'ом, запущенным Hadoop. Затем каждый Mapper посылает запрос в СУБД через стандартный драйвер JDBC, получает в ответ часть результатов запроса и в параллель с другими Mapper'ами обрабатывает результаты. Подход DBInputFormat является корректным, поскольку объединение всех запросов, посылаемых всеми Mapper'ами, эквивалентно исходному SQL-запросу.В подходе DBInputFormat обеспечиваются два интерфейса для обеспечения прямого доступа программам MapReduce к данным из СУБД. Мы посмотрели на исходный код реализации подхода DBInputFormat. Основная реализация является одной и той же для обоих интерфейсов. Эту реализацию можно резюмировать следующим образом. В первом интерфейсе программа MapReduce обеспечивает имя пользователя, пароль и URL СУБД, а также имя таблицы T, список P имен столбцов, которые следует выбрать, необязательные фильтрующие условия C и список имен столбцов O для использования в разделе SELECT P FROM T WHERE C ORDER BY O LIMIT L OFFSET X (Q) При получении запроса Q СУБД реально выполняет запрос Во втором интерфейсе класса DBInputFormat программа MapReduce может предоставить произвольный SQL-запрос SQ на выборку данных, результаты которого являются входными данными для Mapper'ов. В этом случае программа MapReduce должна предоставить и запрос со счетчиком (count query) QC, который должен возвращать целочисленное значение, являющееся числом строк в результате запроса SQ. Класс DBInputFormat посылает в СУБД запрос QC, чтобы получить число строк (R), а дальнейшая обработка – та же самая, что и в первом интерфейсе. Хотя понятно, что подход DBInputFormat, обеспечиваемый компанией Claudera, упрощает процесс доступа к реляционным данным, он не обеспечивает должного роста
производительности при увеличении числа Mapper'ов. С подходом DBInputFormat связано несколько проблем производительности. В обоих интерфейсах каждый Mapper для получения своего поднабора реляционных данных посылает в СУБД, по существу, один и тот же запрос, только с разными значениями в разделах Отмеченные проблемы производительности особенно серьезны для параллельной СУБД, в которой, как правило, имеются много одновременно выполняемых запросов и крупные наборы данных. Кроме того, требуемое упорядочивание/сортировка – это дорогостоящая операция в параллельных СУБД, поскольку строки таблицы не сохраняются в каком-либо одном узле, и для сортировки требуется перераспределение строк по узлам. 3.2. TeradataInputFormatОсновная идея нашего подхода заключается в том, что коннектор Teradata для Hadoop (TeradataInputFormat) посылает в Teradata EDW SQL-запрос Q, обеспечиваемый программой MapReduce, только один раз, и результаты сохраняются в некоторой PPI-таблице T (PPI – Partitioned Primary Index, разделенный первичный индекс). После этого каждый Mapper системы Hadoop посылает новый запрос Qi, в котором всего лишь в каждом AMP запрашивается i-ый раздел.Обсудим теперь нашу реализацию более подробно. Прежде всего, класс TeradataInputFormat посылает в Teradata EDW следующий запрос P, основанный на запросе Q, который предоставляется программой MapReduce. CREATE TABLE T AS (Q) WITH DATA PRIMARY INDEX ( c1 ) PARTITION BY (c2 MOD M) + 1 (P) В этом запросе требуется, чтобы в Teradata EDW был выполнен запрос Q, и чтобы результаты были сохранены в новой PPI-таблице T. Хэш-значение столбца первичного индекса c1 каждой строки результата запроса определяет, в каком AMP должна храниться эта строка. После этого значение выражения, указанного в разделе Затем каждый Mapper посылает в Teradata EDW следующий запрос Qi (1 ≤ i ≤ M): SELECT * FROM T WHERE PARTITION = i (Qi) Teradata EDW напрямую параллельно определит местоположение всех строк i-го раздела каждого AMP и вернет эти строки i-му Mapper'у. Эта операция выполняется параллельно для всех Mapper'ов. После того как все Mapper'ы получат свои данные, таблица T удаляется. Заметим, что если в исходном SQL-запросе всего лишь выбираются данные из базовой таблицы, которая является PPI-таблицей, то мы не создаем еще одну PPI-таблицу (T), поскольку можем непосредственно использовать существующие разделы для разделения данных, которые должен получить каждый Mapper. В настоящее время у PPI-таблицы в Teradata EDW должен иметься столбец первичного индекса. Поэтому при вычислении запроса P системе Teradata EDW требуется разделить результаты запроса между всеми AMP в соответствии со значениями столбца первичного индекса. Одной из возможных в будущем оптимизаций является параллельное построение разделов результатов запроса в каждом AMP без перемещения результатов SQL-запроса Q между AMP. Еще одна возможная оптимизация состоит в том, что для построения M разделов нам в действительности не требуется сортировать строки в каком-либо AMP на основе значений выражения Заметим, что данные, выбираемые программой MapReduce через интерфейс TeradataInputFormat, не сохраняются в Hadoop после завершения программы MapReduce (если только их не сохранит сама программа MapReduce). Поэтому, если какие-то данные Teradata EDW часто используются многими программами MapReduce, более эффективно будет скопировать эти данные и материализовать их в Hadoop в виде файлов Hadoop DFS. В зависимости от числа Mapper'ов, сложности SQL-запроса, предоставляемого программой MapReduce, и объема данных, затрагиваемых этим SQL-запросом, производительность подхода TeradataInputFormat, очевидным образом, может на порядки величин превышать производительность подхода DBInputFormat, что подтверждается предварительными результатами тестирования. Подход TeradataInputFormat, описанный в этом подразделе, можно назвать подходом, основанным на горизонтальном разделении, в том смысле, что каждый Mapper выбирает часть результатов запроса из каждого AMP (узла). В настоящее время мы исследуем подход, основанный на вертикальном разделении, когда несколько Mapper'ов выбирают данные только из одного AMP при M > A (M – число Mapper'ов, запущенных Hadoop, и A – число AMP в Teradata EDW), или когда каждый Mapper выбирает данные из некоторого подмножества AMP при M < A, или когда каждый Mapper выбирает данные из одного и только одного AMP при M = A. Для реализации подхода, основанного на вертикальном разделении, в текущем варианте Teradata EDW требуется больше изменений, чем для реализации подхода, основанного на горизонтальном разделении. Мы предполагаем, что производительность любого из этих подходов не всегда будет превосходить производительность другого подхода. 4. Доступ к данным Hadoop из SQL с использованием табличной UDFВ этом разделе мы опишем, как можно обеспечить прямой доступ к данным Hadoop через SQL-запросы и использовать эти данные совместно с реляционными данными Teradata EDW для выполнения интегрированного анализа данных. Мы обеспечиваем табличную UDF (User Defined Function – функцию, определяемую пользователями), называемую HDFSUDF, которая "вытягивает" данные из Hadoop в Teradata EDW. Например, в следующем SQL-запросе вызывается HDFSUDF для загрузки данных из файла Hadoop с именемmydfsfile.txt в таблицу Tab1 в Teradata EDW:
INSERT INTO Tab1 SELECT * FROM TABLE(HDFSUDF (‘mydfsfile.txt’)) AS T1; Заметим, что после создания табличной UDF HDFSUDF и предоставления ее пользователям она вызывается подобно любой другой UDF. Для пользователей этой табличной UDF несущественно, каким образом данные перемещаются из Hadoop в Teradata EDW. Обычно табличная UDF HDFSUDF пишется таким образом, чтобы при ее вызове из SQL-запроса она выполнялась в каждом AMP. Однако ее можно написать и таким образом, чтобы при вызове из SQL-запроса она выполнялась в каком-либо одном AMP или в какой-либо группе AMP. Каждый экземпляр HDFSUDF, выполняемый в некотором AMP, отвечает за извлечение некоторой части файла Hadoop. Табличная функция HDFSUDF может также производить фильтрацию и преобразование данных по мере того, как эта функция доставляет строки в процессор SQL. Примерный код HDFSUDF и другие подробности доступны на Web-сайте Teradata Developer Exchange [1]. Когда в некотором AMP запускается экземпляр UDF, этот экземпляр связывается с NameNode в Hadoop, который заведует метаданными относительно Для любого запроса от экземпляров UDF к системе Hadoop NameNode устанавливает, какие DataNode в Hadoop отвечают на возврат требуемых данных. Экземпляр табличной UDF, выполяемый на некотором AMP, получает данные непосредственно от тех DataNode, которые сохраняют требуемые блоки данных. Заметим, что никакие данные из файла Hadoop никогда не маршрутизируются через NameNote. Все это делается напрямую от одного узла другому узлу. В нашей примерной реализации [1] мы просто вынуждаем N-ый AMP в системе загружать N-ую порцию файла Hadoop. В зависимости от потребностей приложений можно обеспечить другие типы отображений. При принятии решения о том, какую часть файла следует загружать каждому AMD с использованием табличной UDF, нужно убедиться, что, в конечном счете, все экземпляры UDF прочитают все байты файла Hadoop, и каждый байт будет прочитан только один раз. Поскольку каждый AMP запрашивает данные из Hadoop, посылая в своем запросе смещение в байтах до позиции файла, с которого должно начаться чтение, нам требуется гаантировать, что последняя строка, прочитанная каждым AMP, является полной, а не частичной строкой (если экземпляры UDF обрабатывают входной файл в режиме "строка за строкой"). В нашей примерной реализации [1] у файла Hadoop, который требуется загрузить, строки имеют фиксированный размер; поэтому мы можем простым образом вычислить начальное и конечное смещение в байтах требуемой порции данных для любого AMP. В зависимости от формата входного файла и потребностей приложений назначению каждому AMP соответствующей порции файла может потребоваться уделять более серьезное внимание. После загрузки данных Hadoop в Teradata мы можем анализировать набор данных Hadoop точно так же как любые другие данные, сохраняемые в EDW. Более интересно то, что мы можем выполнять интегрированный анализ данных над реляционными данными, хранимыми в Teradata EDW, и внешними данными, исходно сохранявшимися в Hadoop, без потребности в создании новой таблицы и загрузке в нее данных Hadoop. Это демонстрируется в следующем примере. Предположим, что у некоторой телекоммуникационной компании имеется файл Hadoop
SELECT watchlist.source-id,
count(distinct(T.dest-id)) as Total
FROM watchlist, TABLE(HDFSUDF(’packets.txt’)) AS T
WHERE watchlist.source-id=T.source-id
GROUP BY watchlist.source-id
HAVING Total > 1000000
Приведенный пример показывает, что мы можем использовать подход табличной UDF для обеспечения простой возможности выполнения сложного анализа с применением процессора SQL над данными Hadoop и реляционными данными. В настоящее время мы работаем над более развитой версией HDFSUDF [1], позволяющей пользователям SQL объявлять отображение схем между файлами Hadoop и таблицами SQL, а также фильтровать и трансформировать данные с применением высокоуровневых конструкций SQL без потребности в написании кода на языке Java. 5. Родственные работыMapReduce вызывает огромный интерес как в индустрии, так и в академических кругах. Одно из направлений исследований состоит в повышении мощности или выразительности модели программирования MapReduce. В [19] предлагается добавить к набору примитивов MapReduce новый примитивMERGE, чтобы облегчить выполнение соединений в среде MapReduce, поскольку в MapReduce реализация соединений затруднительна. Pig Latin [14, 9] – это новый язык, разработанный Yahoo! в качестве "золотой середины" между декларативным стилем SQL и низкоуровневым процедурным стилем MapReduce. В проекте Facebook Hive [17] – разрабатывается решение с открытыми исходными кодами для построения хранилищ данных на основе Hadoop. В Hive обеспечивается SQL-подобный декларативный язык HiveQL, который компилируется в задания MapReduce, выполняемые в Hadoop.
В то время как [14, 9, 17, 4] помогают интегрировать конструкции декларативных запросов из РСУБД в MapReduce-подобную среду программирования для поддержки автоматической оптимизации запросов, достижения более высокой продуктивности программирования и большей выразительности запросов, другое направление исследований состоит в том, что исследователи и производители программных продуктов управления базами данных пытаются внедрить лучшие черты MapReduce, включая дружественность по отношению к пользователям и отказоустойчивость, в реляционные базы данных. HadoopDB [3] – это гибридная система, целью разработчиков которой является объединение лучших черт Hadoop и РСУБД. Основная идея HadoopDB состоит в соединении нескольких одноузловых систем баз данных (PostgreSQL) с использованием Hadoop в качестве координатора задач и уровня сетевых коммуникаций. Greenplum и Aster Data позволяют пользователям писать функции в стиле MapReduce над данными, хранимыми под управлением их параллельных систем [12]. Родственной работой по отношению к описанному в разд. 3 подходу TeradataInputFormat является реализация VerticaInputFormat компании Vertica [18], обеспечивающая программам MapReduce прямой доступ к реляционным данным, хранимым в параллельной СУБД Vertica (эта реализация также инспирирована DBInputFormat [7], но не основана на реализации данного интерфейса). Однако в реализации Vertica (как и в реализации DBInputFormat) в СУБД посылается столько SQL-запросов (в каждом из которых, как и в подходе DBInputFormat, к SQL-запросу, предоставленному пользователем, добавляется один раздел В нашем подходе TeradataInputFormat каждый Mapper также случайным образом подключается к узлу Teradata EDW. Однако, по нашему опыту, это не приводит к существенному повышению производительности программ MapReduce, поскольку все запросы параллельно выполняются во всех узлах, независимо от того, в какой узел посылался конкретный запрос. Ключевым фактором высокой производительности подхода TeradataInputFormat является то, что специфицируемые пользователями запросы выполняются только один раз, а не столько раз, сколько имеется Mapper'ов, как это происходит в DBInputFormat и VerticaInputFormat. Дополнительным (не всегда применимым) оптимизационным приемом в VerticaInputFormat является то, что когда пользователь задает параметризованный SQL-запрос типа 6. ЗаключениеИсследования, связанные с MapReduce, продолжают активно развиваться и вызывают интерес как в индустрии, так и в академических кругах. Подход MapReduce особенно интересен для производителей параллельных СУБД, поскольку и в MapReduce, и в РСУБД используются кластеры узлов и масштабируемая технология анализа данных. Крупные заказчики Teradata все чаще сталкиваются с потребностью выполнения интегрированного анализа данных, хранимых и в среде Hadoop, и в Teradata EDW. Мы представили три исследовательские работы, направленные на достижение тесной интеграции Hadoop и Teradata EDW.Наш подход DirectLoad обеспечивает быструю параллельную загрузку данных Hadoop в Teradata EDW. Наш подход TeradataInputFormat дает программам MapReduce возможность эффективного и прямого параллельного доступа к данным Teradata EDW без потребности во внешних шагах экспортирования и загрузки данных из Teradata EDW в Hadoop. Мы также продемонстрировали, каким образом пользователи SQL могут напрямую обращаться к данным Hadoop и соединять их с данными Teradata EDW с применением определяемых пользователями функций. Хотя результаты работ, описанных в этой статье, могут удовлетворить потребности большого числа заказчиков Teradata, нуждающихся в совместном использовании данных Hadoop и Teradata EDW в своей среде корпоративного хранилища данных, имеется еще много проблем, над решением которых мы продолжаем работать. Одной из проблем, которые нас более всего интересуют, является возможность переноса большего объема вычислений из Hadoop в Teradata EDW и из Teradata EDW в Hadoop. |
|
CITForum © 1997–2025