Custom data warehouse on top of mapreduce
First Claim
1. A computer-implemented method for using a data warehouse comprising a cluster of nodes, comprising:
- receiving, by a query processor on a first node of said data warehouse, a particular query, wherein said particular query comprises a DATES clause that specifies date ranges and a FROM clause that specifies at least one virtual view, wherein said at least one virtual view has a nested relational structure and is constructed on the fly by performing a per-node in-memory hash join between one or more partitions of a nested relational fact table and one or more dimension tables;
translating, by said query processor, said particular query into a corresponding map phase and a corresponding reduce phase, wherein said translating further comprises the steps of;
determining, by said query processor, one or more fact table partitions based in part on partitioning by the date ranges specified in said particular query; and
determining and applying particular optimization techniques, based on said particular query;
at each of said one or more other nodes of said data warehouse, each node having one or more of said fact table partitions and having said one or more dimensions tables, creating the at least one virtual view by joining said one or more fact tables with attributes of any of said one or more dimension tables, said joining based on said particular query, wherein when there are two or more virtual views in the FROM clause, performing a union on all the rows from each individual virtual view and when a specific column referred to in the query does not exist in the view, treat the specific column as a NULL value;
performing, by a scan processor at said each node of said one or more nodes, a scan over said at least one virtual views based on said particular query and any of said particular optimization techniques;
performing, by an aggregation processor at said each node of said one or more nodes, local aggregation of said scanned results;
at a third node of said one or more nodes, performing global aggregation of said local aggregations to generate search results of said particular query for further post-processing; and
performing a SELECT on particular columns from said one or more fact table partitions and not from any of said dimension tables,creating a materialized view by joining results of said SELECT on particular columns with said dimension tables; and
using said materialized view at run-time when;
said particular query refers to a virtual view that corresponds to said one or more fact table partitions from which said particular columns were selected;
non-aggregate columns referenced in SELECT, WHERE, GROUP BY clauses in said particular query are a subset of group by columns of said materialized view; and
aggregate columns are computable from aggregate columns of said materialized view.
4 Assignments
0 Petitions
Accused Products
Abstract
A method and apparatus for a data warehouse built on top of MapReduce is provided. A virtual view is defined on top of the common star or snowflake data warehouse schema. A SQL-like, but much more succinct, query language is provided to leverage the virtual view. Many advanced query processing features leveraging the virtual view are also provided. Further, several optimization techniques ranging from data compression and access method to multi-query optimization and exploiting materialized views are provided. Further, a technique to seamlessly integrate the data warehouse system into any ad-hoc MapReduce jobs is provided. Thus, fully leveraging the power of both MapReduce and data warehouse technologies is achieved.
132 Citations
18 Claims
-
1. A computer-implemented method for using a data warehouse comprising a cluster of nodes, comprising:
-
receiving, by a query processor on a first node of said data warehouse, a particular query, wherein said particular query comprises a DATES clause that specifies date ranges and a FROM clause that specifies at least one virtual view, wherein said at least one virtual view has a nested relational structure and is constructed on the fly by performing a per-node in-memory hash join between one or more partitions of a nested relational fact table and one or more dimension tables; translating, by said query processor, said particular query into a corresponding map phase and a corresponding reduce phase, wherein said translating further comprises the steps of; determining, by said query processor, one or more fact table partitions based in part on partitioning by the date ranges specified in said particular query; and determining and applying particular optimization techniques, based on said particular query; at each of said one or more other nodes of said data warehouse, each node having one or more of said fact table partitions and having said one or more dimensions tables, creating the at least one virtual view by joining said one or more fact tables with attributes of any of said one or more dimension tables, said joining based on said particular query, wherein when there are two or more virtual views in the FROM clause, performing a union on all the rows from each individual virtual view and when a specific column referred to in the query does not exist in the view, treat the specific column as a NULL value; performing, by a scan processor at said each node of said one or more nodes, a scan over said at least one virtual views based on said particular query and any of said particular optimization techniques; performing, by an aggregation processor at said each node of said one or more nodes, local aggregation of said scanned results; at a third node of said one or more nodes, performing global aggregation of said local aggregations to generate search results of said particular query for further post-processing; and performing a SELECT on particular columns from said one or more fact table partitions and not from any of said dimension tables, creating a materialized view by joining results of said SELECT on particular columns with said dimension tables; and using said materialized view at run-time when; said particular query refers to a virtual view that corresponds to said one or more fact table partitions from which said particular columns were selected; non-aggregate columns referenced in SELECT, WHERE, GROUP BY clauses in said particular query are a subset of group by columns of said materialized view; and aggregate columns are computable from aggregate columns of said materialized view. - View Dependent Claims (2, 3, 4, 5, 6, 7, 8)
-
-
9. An apparatus for using a data warehouse comprising a cluster of nodes, comprising:
-
means for receiving, by a query processor on a first node of said data warehouse, a particular query, wherein said particular query comprises a DATES clause that specifies date ranges and a FROM clause that specifies at least one virtual view; means for translating, by said query processor, said particular query into a corresponding map phase and a corresponding reduce phase, wherein said translating further comprises the steps of; means for determining, by said query processor, one or more fact table partitions based in part on partitioning by the date ranges specified in said particular query; and means for determining and applying particular optimization techniques, based on said particular query; at each of said one or more other nodes of said data warehouse, each node having one or more of said fact table partitions and having one or more dimensions tables, means for creating the at least one virtual view by joining said one or more fact tables with attributes of any of said one or more dimension tables, said joining based on said particular query, wherein when there are two or more virtual views in the FROM clause, performing a union on all the rows from each individual virtual view and when a specific column referred to in the query does not exist in the view, treat the specific column as a NULL value; means for performing, by a scan processor at said each node of said one or more nodes, a scan over said at least one virtual views based on said particular query and any of said particular optimization techniques; means for performing, by an aggregation processor at said each node of said one or more nodes, local aggregation of said scanned results; at a third node of said one or more nodes, means for performing global aggregation of said local aggregations to generate search results of said particular query for further post-processing; and means for performing a SELECT on particular columns from said one or more fact table partitions and not from any of said dimension tables, means for creating a materialized view by joining results of said SELECT on particular columns with said dimension tables; and means for using said materialized view at run-time when; said particular query refers to a virtual view that corresponds to said one or more fact table partitions from which said particular columns were selected; non-aggregate columns referenced in SELECT, WHERE, GROUP BY clauses in said particular query are a subset of group by columns of said materialized view; and aggregate columns are computable from aggregate columns of said materialized view.
-
-
10. A non-transitory machine-readable storage medium storing one or more sequences of instructions which, when executed by one or more processors, causes the one or more processors to perform:
-
receiving, by a query processor on a first node of said data warehouse, a particular query, wherein said particular query comprises a DATES clause that specifies date ranges and a FROM clause that specifies at least one virtual view, wherein said at least one virtual view has a nested relational structure and is constructed on the fly by performing a per-node in-memory hash join between one or more partitions of a nested relational fact table and one or more dimension tables; translating, by said query processor, said particular query into a corresponding map phase and a corresponding reduce phase, wherein said translating further comprises the steps of; determining, by said query processor, one or more fact table partitions based in part on partitioning by the date ranges specified in said particular query; and determining and applying particular optimization techniques, based on said particular query; at each of said one or more other nodes of said data warehouse, each node having one or more of said fact table partitions and having said one or more dimensions tables, creating the at least one virtual view by joining said one or more fact tables with attributes of any of said one or more dimension tables, said joining based on said particular query, wherein when there are two or more virtual views in the FROM clause, performing a union on all the rows from each individual virtual view and when a specific column referred to in the query does not exist in the view, treat the specific column as a NULL value; performing, by a scan processor at said each node of said one or more nodes, a scan over said at least one virtual views based on said particular query and any of said particular optimization techniques; performing, by an aggregation processor at said each node of said one or more nodes, local aggregation of said scanned results; at a third node of said one or more nodes, performing global aggregation of said local aggregations to generate search results of said particular query for further post-processing; and performing a SELECT on particular columns from said one or more fact table partitions and not from any of said dimension tables, creating a materialized view by joining results of said SELECT on particular columns with said dimension tables; and using said materialized view at run-time when; said particular query refers to a virtual view that corresponds to said one or more fact table partitions from which said particular columns were selected; non-aggregate columns referenced in SELECT, WHERE, GROUP BY clauses in said particular query are a subset of group by columns of said materialized view; and aggregate columns are computable from aggregate columns of said materialized view.
-
-
11. A non-transitory machine-readable storage medium storing one or more sequences of instructions which, when executed by one or more processors, causes the one or more processors to perform:
wherein further post-processing comprises the steps of computing aggregate values based on said scanned results and performing any other residual expressions in said particular query over said aggregate values.
-
12. A non-transitory machine-readable storage medium storing one or more sequences of instructions which, when executed by one or more processors, causes the one or more processors to perform:
at least one of said fact table partitions comprises at least one nested table; said at least one virtual view is a nested relational virtual view that is created by and used by said scan processor by performing the steps of; un-nesting said at least one nested table for each row; and performing a union over all such un-nested rows; for each row of said union of rows, returning values of at least one column in said at least one nested table; and for each row, applying a user-defined function to said returned values to generate a single value result.
-
13. A non-transitory machine-readable storage medium storing one or more sequences of instructions which, when executed by one or more processors, causes the one or more processors to perform:
a scan and before said performing global aggregation, further comprising the step of; performing an ad-hoc user program on said scanned results; wherein said ad-hoc user program is provided in said particular query; and wherein said node comprises a data access primitive (DAP) interface.
-
14. A non-transitory machine-readable storage medium storing one or more sequences of instructions which, when executed by one or more processors, causes the one or more processors to perform:
wherein each n rows in said one or more fact table partitions are stored in a cell and wherein a storage format for each cell comprises; a header that comprises a schema version and a number of rows in said cell; one or more column sets and one or more pointers, each pointer pointing to the beginning of each of said one or more column sets; wherein each of said one or more column sets starts with a compression flag indicating any of said compression types; dictionary encoding; common value encoding; run length encoding; and no compression.
-
15. A non-transitory machine-readable storage medium storing one or more sequences of instructions which, when executed by one or more processors, causes the one or more processors to perform:
wherein said creating at least one virtual view further comprises the step of; for each cell, loading said cell in memory and decoding through an iterator interface only column sets of said cell that are referred to in said particular query.
-
16. A non-transitory machine-readable storage medium storing one or more sequences of instructions which, when executed by one or more processors, causes the one or more processors to perform:
wherein said creating at least one virtual view further comprises the step of; for each cell, loading said cell in memory and decoding through an iterator interface only column sets of said cell that are referred to in said particular query.
-
17. A non-transitory machine-readable storage medium storing one or more sequences of instructions which, when executed by one or more processors, causes the one or more processors to perform:
wherein if a particular column referenced in said particular query does not exist in said at least one virtual view, assigning said particular column NULL values.
-
18. A computer-implemented method for using a data warehouse comprising a cluster of nodes, comprising the steps of:
-
receiving, by a query processor on a first node of said data warehouse, a particular query, wherein at least one virtual view having a nested relational structure is constructed on the fly by performing a per-node in-memory hash join between one or more partitions of a nested relational fact table and one or more dimension tables; translating, by said query processor, said particular query into a corresponding map phase and a corresponding reduce phase, wherein said translating further comprises the steps of; determining, by said query processor, one or more fact table partitions, based on said particular query; and determining and applying particular optimization techniques, based on said particular query; at each of said one or more other nodes of said data warehouse, each node having one or more of said fact table partitions and having said one or more dimensions tables, creating at least one virtual view by joining said one or more fact tables with attributes of any of said one or more dimension tables, said joining based on said particular query; performing, by a scan processor at said each node of said one or more nodes, a scan over said at least one virtual views based on said particular query and any of said particular optimization techniques; performing, by an aggregation processor at said each node of said one or more nodes, local aggregation of said scanned results; at a third node of said one or more nodes, performing global aggregation of said local aggregations to generate search results of said particular query for further post-processing; and performing a SELECT on particular columns from said one or more fact table partitions and not from any of said dimension tables, creating a materialized view by joining results of said SELECT on particular columns with said dimension tables; and using said materialized view at run-time when; said particular query refers to a virtual view that corresponds to said one or more fact table partitions from which said particular columns were selected; non-aggregate columns referenced in SELECT, WHERE, GROUP BY clauses in said particular query are a subset of group by columns of said materialized view; and aggregate columns are computable from aggregate columns of said materialized view.
-
Specification