Search This Blog

Sunday, April 28, 2013

Distributed or replicated table? And what is important when choosing the distribution key?

PDW v1/v2

For large tables usually we’re looking for tuning options like creating an index or having a good partition strategy in place. For the Parallel Data Warehouse (PDW) additional decisions have to be made for the table layout.

Distributed or replicated?

The first decision is about the way the table is stored on the compute nodes. There are two options:

  1. Replicated
    All data of the table is available on all compute nodes
  2. Distributed
    The data of the table is distributed between the compute nodes

Distributing table data between the compute nodes follows the real nature of the MPP system. In PDW this distribution is established using a hash function on a specific table column which is referred to as the distribution key. In the other hand, replicated tables have their full content available on every compute node.

Creating a table in one of the two modes is quite easy:

Distributed table Replicated table

    ID int NOT NULL,

    ID int NOT NULL,

But when do we choose a distributed or replicated table?

As a rule of thumb, you will want to create tables which contain reference data, or – as we say in the data warehouse environment - dimensions, as replicated tables if they are not too big. The reason is simple. The typical data warehouse query will be a star join between the fact and the dimension tables with where-conditions on columns of the dimension tables, grouping (group by) on columns of the dimension tables and aggregations based on columns of the fact tables. So, if we distribute the large fact tables in order to leverage the full power of the MPP engine, having the dimensions on each compute node allows the compute node to answer the query with out needing data from other compute nodes. Let’s take a look at the following query based on a customer dimension which is linked to a sales table:

select Sum(SalesAmount) from FactSales
inner join DimCustomer On FactSales.CustomerKey=DimCustomer.CustomerKey
where DimCustomer.Region='EMEA'

No matter on which key the FactSales table is distributed, having the DimCustomer table replicated means, that each compute node can individually compute the Sum of sales for the customers in the EMEA region. There still has to be a final aggregation for the results coming from each compute node, but in this case, this is just one line per compute node.

Also consider, that read/write is much faster with distributed tables (parallel process) compared to replicated table. This is one reason why replicated tables should be used for a smaller amount of data.


Choosing a good distribution key

The following aspects are important when choosing a distribution key:

  • What kind of workload do we have?
    (do we usually see lots of “atomic” reads, returning only a few rows, or do we more likely expect large scans and aggregates on the table)
  • What are the typically performed joins among the tables
  • What are the typically performed aggregations (group by) used on the tables?
  • How is the distribution key itself distributed?
    Choosing a distribution key which is unequally distributed will result in skew. The different distributions of the table should contain almost the same number of rows in order to have a good parallelization of queries over the compute nodes. If all the data sits on one compute node because of a bad distribution, this node becomes the bottleneck and you cannot expect a good performance.

I’m getting back to the challenge of finding a good distribution key in later posts.

Please keep in mind, that for both decisions, distributed vs. replicated and the distribution key, you don’t have to make a decision that lasts forever. In fact, it’s quite unlikely that you come up with the best solution at the very beginning. It’s quite easy to redistribute a table based on another distribution key or to turn a distributed table into a replicated one. For both scenarios, CTAS can be used. CTAS stands for Create Table As Select. This is quite similar to the Select into syntax on the SMP SQL Server. For example, if you want to change the distribution key of a table Sales you could follow these steps:

  • CTAS Sales to SalesNEW having the new distribution key
  • rename Sales to SalesBAK
  • rename SalesNEW to Sales
  • drop SalesBAK

Monday, April 22, 2013

Big Data and Analytics

PDW v2 | Big Data

In my former post about Big Data, I used a “definition” which can be abbreviated as

“data that is too big for analysis within the required time”

The key aspects of this phrase are:

  1. size of data
  2. time frame for the analysis
  3. complexity of analysis


The time frame can be real time, near time, a few hours or maybe even days. This depends on the business requirement. The size of data may get bigger than expected because you need additional data sources (for example external data from market places) for your analysis. But today I’d like to focus on the third bullet point: the complexity of analysis.

If you don’t have complex analysis requirements and if you have plenty of time, you can process terabytes of data without any big data issues. Remember that storing a huge amount of data is not the big problem. But retrieving the data and doing analysis on this data is much more challenging.

But what are complex analytical computations? In SQL we can do a lot of computations by aggregating detailed values (sum, average, min, max etc.). And for many of the typical business performance indicators, this works quite well. But what about the following tasks:

  • Frequency analysis and decompositions (Fourier-/Cosine-/Wavelet transformation) for example for forecasting or decomposition of time series
  • Machine learning and data mining, for example k-means clustering, decision trees, classification, feature selection
  • Multivariate analysis, correlation
  • Projections, prediction, future prospects
  • Statistical tests (for example chi-squared or binomial)
  • Trend calculations, predictions and probability for certain trends or scenarios
  • Complex models involving simulations (for example Monte Carlo simulation for risk analysis)
  • binomial, normal or other types of distributions and density functions


For example, a decomposition of a time series into its main components may look like this:


(Source: R, temperature in Nottingham taken from the datasets library)

Decomposing time series can helpful to analyze periodicity and trends of sales data for example. This could be important for calculating the effect of promotions or to understand seasonal effects.

And this is just one example. As long as you can only slice and dice on your existing data, you’re always looking at the past. But in order to derive ideas and guidance for future decisions, higher sophisticated methods are required than just sum/group by. Some people even say, that this is where Business Intelligence starts. Everything else is just an analysis of the past which is also important, but there is so much more to find. The current discussion about data scientists clearly shows the rising demand for getting more out of your data. And to be honest, having a data scientist working just with a tool like Excel is like having Dr. House using just a folding rule as medical instrument instead of all the sophisticated laboratory instruments and equipment…it doesn’t work.

So, there are a lot of calculations that go far beyond the capabilities of traditional SQL. Therefore, we usually need to load the data from our data warehouse into some kind of analytical or statistical tool which is specialized such calculations. The results can then be transferred back into the relational database tables. As the focus of such tools differs from the focus of databases, these tools are usually separated from the database but offer interfaces (for example ODBC or flat file) to load data. Common tools are R, SAS, MatLab, just to name a few of them. R (, for example, is a toolset for doing advanced calculations and research level statistical computing. R is open source and can easily be extended using packages (libraries). Today, a huge amount of such packages exists for all kinds of different tasks.

However, when it comes to Big Data, the process of unloading all the required data can be very time consuming. So for Big Data analytics it’s important to bring both worlds together. This would be the perfect match. For doing so, the following two options are most promising:

  1. Using Hadoop (Map/Reduce)
  2. Using In-Database Analytics



PDW v2 offers a seamless integration with Hadoop using Polybase. This makes it easy and fast to export data on a Hadoop infrastructure. Research level analytics can then be performed on the Hadoop cluster. For this purpose, R supports distributed analysis and Map/Reduce jobs using the HadoopStreaming library. But we’re still copying the data out to the analytical environment, right? Yes, but in this scenario, each infrastructure is used in an optimal way:

    • PDW for high-performance SQL queries to combine and arrange the data in the format needed for analytical engines (more like a NoSQL format, for example to prepare variables for data mining).
    • Hadoop for distributed parallel computing tasks using Map/Reduce jobs
    • High performance (massive parallel) data transfer between the MPP (PDW) and Hadoop.
    • Transparent access of the analytical results using SQL (seamless integration of relational and non-relational data with Polybase)

Preparing the data for analytics can be a complex and challenging process. Usually data from multiple tables needs to be joined and filtered. Using SQL is the best choice for this task. For example, for preparing call center data for a mining model, it may be necessary to create variables (single row of data) that contain the number of complaints per week over the last weeks. This can then be used to build a decision tree. In SQL, this task is easy and in an MPP environment, we get the best performance for this task. For the decision tree we need to perform a feature selection at each node of the tree. This involves statistical functions and correlation which reach far beyond SQL. Using the analytical environment is the best choice for such advanced calculations. The resulting decision tree (rules, lift chart, support probabilities etc.) can then be stored as a file on the Hadoop cluster and from there being queried or imported back into relational database tables using Polybase.


In-Database Analytics

Another approach is to operate the analytical engine on the same platform and on the same data as the MPP database system. This approach ties both worlds together in a very consistent way but it’s currently not available on the PDW (although it is on my personal wishing list). However, in other MPP environments, this approach is not uncommon. For example, in SAP HANA you can write stored procedures in R just like this





The function body is then standard R code using R syntax, not SQL.

Typical features for In-Database Analytics include:

  • Analytical stored procedures
  • In-database analytics: direct access to database tables and views from the analytical engine without needing to load/unload the data
  • Tables/Views as parameters for the analytical functions (for example R data frames)
  • Full utilization of in-memory capabilities
  • Full utilization of the parallel query engine


Conclusion: In order to perform sophisticated analysis based on your BI data, SQL is not sufficient. Specialized toolsets like R are the the best solution. However, when it comes to Big Data, loading/unloading the data into these toolset may not be efficient anymore. A closer integration is necessary. Using Hadoop or In-Database Analytics are promising approaches for this scenario.

Saturday, April 6, 2013

What’s the buzz about MPP Data Warehouses (part 2)?

PDW v1/v2

In my first post I wrote about the need of a consequently tuned and aligned database server system in order to handle a high data warehouse workload in an efficient way. A commonly chosen implementation for this is a massive parallel shared nothing architecture. In this architecture your data is distributed among several nodes, each with their own storage. A central node processes incoming queries, calculates the parallel query plan and sends the resulting queries to the compute nodes.In a simplified form, this architecture looks as shown below:


Since different vendors choose different detail strategies, from now on, I’m focusing on the Microsoft Parallel Data Warehouse, or in short, the Microsoft PDW. The PDW is Microsoft’s solution for MPP data warehouse systems. The PDW ships as an appliance, i.e. as a pre-configured system (hard- and software) of perfectly compatible and aligned components, currently available from HP and DELL.

What happens if data is loaded into such a system? Let’s assume we have a table with 6 rows of sales and for simplicity, let’s assume we only have two compute nodes. In order to distribute the data among the compute nodes, a distribution keys needs to be chosen. This key will be used to determine the compute node for each row. Why don’t we just do a round robin distribution? I’m getting back to this point later in this post. The distribution key (table column) is used in a hash function to find a proper node. The hash function takes into account the data type of the distribution key, as well as the number of distributions. Actually, in PDW the table is also distributed on the compute node itself (8 different tables on different files/file groups) to get the optimal usage of the compute node’s cores and the optimal throughput to the underlying storage. For our example, let’s assume that the date values hash to the nodes as shown in this illustration:


As you see, each row of data is routed to a specific compute node (no redundancy). Doesn’t make this the compute node a single point of failure in the system?  Actually no, because of the physical layout of the PDW. In PDW v2 two compute nodes share one JBOD storage system, one of them communicating actively with the JBOD, the other using the infiniband network connection. The compute nodes itself are “normal” SQL Server 2012 machines running on Hyper-V. If a compute node fails, the data is still reachable using the second compute node that is attached to this JBOD. The compute nodes form an active/passive cluster, therefore the spare node can take over, if a node fails. The damaged node may easily be repaired or replaced. A Hyper-V image for a compute node sits on the management node (which I omitted in the illustration above). And again, this is just a very broad overview of the architecture. You can find very detailed information on the technology here:

With the example above, what happens if we query a single date? Since we distributed the table on the date, a single compute node contains the data for this query. The control node can pass the query to the nodes and has no more action to take. The compute node containing the data can directly stream the data to the client. The same would happen if we run a query that groups by Date (and potentially filters by some other columns). Now both compute nodes can separately compute the result and stream the result to the client. What you see from this example is

  • In this case, two machines work in parallel and fully independently from each other
  • Since we distributed on Date and the query uses Date in the grouping, no post processing is necessary (we call this an aggregation compatible query)
    (if the distribution would have happened based on a round robin approach, there would never by an aggregation compatible query)
  • In order to get the best performance in this case, it’s important that the data is equally distributed between the compute nodes. In the worst case of all the data being queried sitting on only one of the compute nodes, this one node would have to do the full work. Choosing a proper distribution key can be challenging. I’m getting back to this in a subsequent post.

What happens if we run a query like the following?

select Sum([Sales Amount]) from Sales

Again each compute node can compute the individual partial result but now these results need to be send to the control node to calculate the final result (so called partition move). However, in this example, the control node gets much less rows to process compared to the total amount of rows. Imagine millions or billions of rows being distributed to the compute nodes. The control node in this example would only get two rows with partial results (as we have two compute nodes in our example). So, this operation still fully benefits from the parallel architecture.


And this works for most kind of aggregations. For example, if you replace sum([Sales Amount]) with avg([Sales Amount]), the query optimizer would ask the compute nodes for the sum and count and compute the average in the final step.

Ok, usually data models are more complicated (even in a data warehouse) than a single table. In a data warehouse we usually find a star (or snowflake) relationship between facts and dimensions. For the sales table above, this could look like this:


What happens now, if the query above is filtered by the product group, which is an attribute of the product dimension?

select sum([S.Sales Amount]) from Sales S
inner join Product P on S.ProductKey=P.ProductKey
where P.ProductGroup='X'

How should we distribute the product table on the compute nodes in order to get good query performance? One option would be to distribute on the same key as the Sales. If we can do so, each compute node would see all products that are related to the sales that sit on this compute node and therefore answer the query without needing any data from other nodes (we would call this a distribution compatible query). However, the date is not a column in the product table (this wouldn’t make sense) so we cannot distribute the products in this way. It would work, if we had a SalesOrderHeader and SalesOrderDetail table, both joined and distributed on a SalesOrderID. But for the product dimension (as for most other dimensions too) we can go for a more straightforward approach. Fortunately, in a data warehouse, dimensions usually contain very few rows compared to the fact tables. It’s not uncommon to see over 98% of the data in fact tables. So for the PDW, it would make no difference if we put this table on all compute nodes. We call this a replicated table, while the Sales table itself is a distributed table. By making the Sales table distributed and the dimensions replicated, each compute node can answer queries that filter or group on dimension table attributes (columns) autonomically without needing data from other compute nodes.



Of course, this is just a very brief overview to show the basic concept. If we need to scale the machine, we could add more compute nodes and (after a redistribution of the data) can easily benefit from the higher computing power. This means we can start with a small machine and add more nodes as required which gives a great scalability. For the PDW v2 you can scale from about 50TB to about 6PB with a linear performance gain.

Also, PDW v2 offers a lot more features. Especially I’d like to mention the clustered column store index (CCI), which is a highly compressed, updatable in-memory storage of tabular data. Together with the parallel processing of the compute nodes this gives an awesome  performance when querying data from the PDW. Also the seamless integration with Hadoop (via PolyBase) allows us to store unstructured data in a Hadoop file system and query both sources transparently from the PDW in the well known SQL syntax without IT needing to transfer the data into the relational database or to write map-reduce jobs.

Again, there is much more to read about the PDW. A good starting point is the PDW website:



Data Warehouses with large amount of data have challenges that go far beyond just storing the data. Being able to query and analyze the data with a good performance requires special considerations about the system architecture. When dealing with billions of rows, classical SMP machines can easily reach their limit. The MPP approach, that distributes data on multiple, independent compute nodes can provide a robust and scalable solution here. The Microsoft Parallel Data Warehouse is a good example for this approach that also includes features like in-memory processing (clustered column store index in v2) and a transparent layer on both structured and unstructured data (Hadoop).