Search This Blog

Sunday, May 11, 2014

Practical Clustered Columnstore Index on Parallel Data Warehouse (part 2)

PDW 2012 | APS 2012

In part 1 of this topic, I showed how basic insert operations in a CCI table are handled internally. As we saw, inserting of large batches causes the CCI to compress the data immediately (apart from the remaining rows that are still kept in the delta store) while small batches are causing closed row groups which are compressed asynchronously in the backgroud. Now I like to focus more on insert operations and on the distributions within the CCI table.

Step

Description

9

What happens for an update-operation on an already compressed row? Let’s try:

update CCITest set val='Test X' where val='Test del'

Since the compressed rows in the columnstore cannot be physically updated, the CCI marks the existing row as ‘deleted’ and inserts the new values as a new row. Therefore you see one deleted row (I included that column from the meta data query here) and one new row in the open row group (now containing 51425 rows instead of 51424 before):


image_thumb4

All subsequent update or delete operations to this row are now written to the delta store. Deleted rows cause some kind of fragmentation in the CCI. While one deleted row is nothing to worry about, you should carefully monitor the number of deleted rows in your CCI tables. If you find many deleted rows you should defragment the table using CTAS (as CTAS is always the answer…) or index rebuild. CTAS copies the data over to a new table thus resolving fragmentation. This still does not guarantee that the data is in compressed form after the CTAS. After all, CTAS goes through the delta store mechanism we’ve been discovered before and as we’ve seen, this may result in row groups that are still open and waiting for more rows of data to arrive. The other option for defragmentation of a CCI table is to run an index rebuild:

alter index all on CCITest rebuild partition = 1
-- or alternatively: partition=ALL

One thing to consider is, that this statement requires an exclusive lock on the table. Currently we’re using partition switching to move new data into our fact tables and therefore we’re running index rebuild/reorganize operations before switching the partition into the final fact table.

10

Ok, back to start. How many open, closed, compressed row groups can we have in the CCI table? While we will only find at most one open row group per partition and distribution, we can have as many closed and compressed row groups as needed. Take a look at the following example (starting with a truncate to reset our table):

truncate table CCITest
declare @i int =1
while @i<50 begin
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
set @i=@i+1
end

Immediately after the statement is executed the result from the meta data query will look like this:

clip_image00121_thumb2

Since we inserted in small batches, the open row group is used until it reaches the limit of 1,048,576 rows. After that, it is closed and a new open row group is created.

Again, after a while, the tuple mover process begins picking up the closed row groups. The following screenshot was taken while the tuple mover still processes the table:

clip_image0023_thumb2

And finally, when the tuple mover is finished, the closed row groups are all compressed:

clip_image0033_thumb2

11

Now let’s try something different. As I said before, row groups are created per distribution and partition. Now we’re going to perform 16 inserts on a 2 compute node appliance (each with 8 distributions thus giving 16 distributions in total). I adjusted the id (hash column) in a way that we’re hitting all of the 16 distributions:

truncate table CCITest
insert into CCITest select top 100000 0, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 2, 'Test' from MyLargeTable
insert into CCITest select top 100000 3, 'Test' from MyLargeTable
insert into CCITest select top 100000 4, 'Test' from MyLargeTable
insert into CCITest select top 100000 5, 'Test' from MyLargeTable
insert into CCITest select top 100000 6, 'Test' from MyLargeTable
insert into CCITest select top 100000 7, 'Test' from MyLargeTable
insert into CCITest select top 100000 8, 'Test' from MyLargeTable
insert into CCITest select top 100000 9, 'Test' from MyLargeTable
insert into CCITest select top 100000 10, 'Test' from MyLargeTable
insert into CCITest select top 100000 11, 'Test' from MyLargeTable
insert into CCITest select top 100000 12, 'Test' from MyLargeTable
insert into CCITest select top 100000 13, 'Test' from MyLargeTable
insert into CCITest select top 100000 14, 'Test' from MyLargeTable
insert into CCITest select top 100000 15, 'Test' from MyLargeTable

As you see, for each distribution a new row group in status ‘open’ is created.

clip_image00123_thumb3

Think about this for a moment. Since we already showed that we can load about 1 million rows in an open row group (as long as we’re using small batches of up to 100,000 rows) and could have about 16 million rows in open row groups (one million per distribution) per partition as a worst case.

So, if you’re using CCI on smaller tables (for example a dimension table) or on tables with many partitions, you can easily face situations, where most of the table’s rows are uncompressed. Again, it’s important to monitor the amount of uncompressed rows and to perform an alter index reorganize (for the closed row groups) or alter index rebuild (if there are too many open row groups).

12

Speaking about monitoring. A good start is the meta data query from above. However, you could also try running dbcc pdw_showspaceused. Let’s try:

dbcc pdw_showspaceused(CCITest)

What you can see here is, that dbcc pdw_showspaceused does only count compressed rows (first column reads 0 rows in each of the 16 distributions). Therefore, pdw_showspaceused is not a good tool for determining uncompressed rows in a CCI table.

clip_image002_thumb3

13

After running an index rebuild on the table, dbcc pdw_showspaceused shows the correct number of rows since all rows are now stored in compressed row groups:

alter index all on CCITest rebuild partition = 1
dbcc pdw_showspaceused(CCITest)

clip_image0026_thumb2

 

Summary

CCI is a powerful feature in the PDW 2012 appliance. However, you should monitor your CCI tables carefully for open/closed row groups, because the data in such row groups is not yet compressed and depending on the table layout (for example the number of partitions) you may find a lot of rows here. Maintenance tasks (index reorganize, index rebuild) or a smart loading strategy (for example by performing the rebuild before the partition with the new data is switched into the final table) are counter measures to consider here. If your CCI table also has to handle delete or update operations you may also have to deal with fragmentation (number of deleted rows in the CCI). Consider using CTAS and partition switching patterns rather than running delete or update statements that involve a lot of rows. If this is not possible, monitoring fragmentation is important. Then, from time to time, you will have to defragment the CCI table (CTAS each partition to a new table or use index rebuild).

Saturday, May 3, 2014

Practical Clustered Columnstore Index on Parallel Data Warehouse (part 1)

PDW 2012

Together with the parallel query engine, the clustered column store index (CCI) gives the Parallel Data Warehouse (PDW) an outstanding query performance. As with SQL Server 2014, data is natively stored in compressed format, giving these advantages:

  • less size needed for storing large amounts of data
  • less IO needed to read it
  • outstanding query performance because it is memory optimized (xVelocity)

 

What makes the CCI especially useful is that the table remains writable, so you can perform any kind of loading operations (inserts, partition switching) on the table without any need to recreate the column store index afterwards. In fact, the column store index IS the table (therefore the name clustered column store index).

However, when data is written to the CCI it is not always being compressed immediately. In short, newly inserted data is split up into row groups. The status of a row group may be

OPEN This means that the rows are actually stored in a row store (a ‘normal’ table, so called delta store). PDW still waits for more rows to arrive before the rows are compressed into the CCI storage.
CLOSED This means that this current row store buffer (delta store) is full (at approximately 1 million rows of data). This buffer will be compressed asynchronously by a background process (‘tuple mover process’).
COMPRESSED This means that the row group is really compressed in the CCI (final stage).

You can find more information on this including some illustrations in the PDW help file (look for ‘clustered columnstore index’).

In order to explain the nature of CCI write operations on the PDW, let’s go through some practical tests inserting, deleting and updating rows under different scenarios. You can find a similar example in the PDW help file but I wanted to cover more aspects here.

After each operation we want to see how the delta store looks like. I’m using the query from the metadata queries section of the chm file for this purpose:

-- Show size of columnstore index
SELECT
  CSRowGroups.pdw_node_id, CSRowGroups.partition_number, CSRowGroups.row_group_id, CSRowGroups.state_description, CSRowGroups.total_rows,
  CSRowGroups.deleted_rows, CSRowGroups.size_in_bytes
FROM sys.objects AS o
JOIN sys.indexes AS i
    ON o.object_id = i.object_id
JOIN sys.pdw_index_mappings AS IndexMap
    ON i.object_id = IndexMap.object_id
    AND i.index_id = IndexMap.index_id
JOIN sys.pdw_nodes_indexes AS NI
    ON IndexMap.physical_name = NI.name
    AND IndexMap.index_id = NI.index_id
JOIN sys.pdw_nodes_column_store_row_groups AS CSRowGroups
    ON CSRowGroups.object_id = NI.object_id
    AND CSRowGroups.pdw_node_id = NI.pdw_node_id
AND CSRowGroups.index_id = NI.index_id   
WHERE o.name = 'CCITest'
ORDER BY 1,2

 

Step

Description

1

For our tests, let’s first created a distributed table. Although not recommended, I choose the same key here for partitioning and distribution.

create table CCITest (
id int NOT NULL,
val nvarchar(20) NOT NULL
)
with (
distribution = hash(id), clustered columnstore index,
partition (id range right for values (100) )
)

Checking the row groups using the meta data query from above shows no results (no row groups are created in advance). This is also what we see after a truncate table operation:

clip_image001[3]

2

We now insert one row of data into one partition of the table:

insert into CCITest values(1,'Test')

As expected, this row goes to the delta store causing an open row group. Please note, that this row is not yet compressed but waiting for more rows to arrive (in order to efficiently use CCI compression).

clip_image001[5]

3

Let’s insert another row of data into another partition of the table:

insert into CCITest values(100,'Test')

Since row groups are created per distribution and partition, we have created a second open row group. Both row groups are waiting for more rows of data before they are going to be compressed.

clip_image001[7]

4

Now we delete the recently inserted row of data:

delete from CCITest where id=100;

Since the row is not yet compressed, it is simply removed from the delta store:

clip_image001[9]

5

Ok, now let’s insert 100,000 rows of data into the first partition of this table. I’m using a table “MyLargeTable” here which can be any table containing at least 100,000 rows:

insert into CCITest select top 100000 1, 'Test' from MyLargeTable

Together with the row from above, this gives 100,001 rows in the first row group. Please note, that the row group still isn’t compressed (status ‘OPEN’).

clip_image001[11]

6

Let’s empty the table (using a truncate) and then insert 200,000 rows of data into the first partition:

truncate table CCITest
insert into CCITest select top 200000 1, 'Test' from MyLargeTable

What you see here, is that a 200,000 rows insert causes the CCI to automatically (and synchronously) compress the data. Status ‘COMPRESSED’ means that the data is now stored natively in column store optimized format. So if more than 100,000 rows (or more exactly more than 102,400 rows) are written to a row group in a batch, the rows are automatically and synchronously compressed.

clip_image001[13]

7

We empty the table again (truncate) and now insert 10 times 100,000 rows of data into the table (I tag one row of the first insert for later use):

truncate table CCITest
insert into CCITest select top 99999 1, 'Test' from MyLargeTable
insert into CCITest select top 1 1, 'Test del' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable
insert into CCITest select top 100000 1, 'Test' from MyLargeTable

Although we inserted many more rows than before, now the rows are not compressed but we now have one open row group with 1,000,000 rows of data:

clip_image001[15]

The difference to the example from above where we inserted 200,000 rows causing an immediate compression is that we now have smaller batches. Remember that our table now contains 1 millions rows, all being in uncompressed state (“slow” access). Please note that since we’ve always used the same id (hash column) all these rows are stored in the same distribution.

8

In order to find out, if this goes on and on for ever, we insert another 100,000 rows of data into the table

insert into CCITest select top 100000 1, 'Test' from MyLargeTable

What we now observe is that the open row group was closed after reaching 1,048,576 rows. The remaining rows are inserted into a freshly created open row group:

clip_image001[17]

Status ‘CLOSED’ means uncompressed row storage. Compression is now being performed asynchronously using a background task (‘tuple-mover process’) which runs every 5 minutes. So after a short moment, our table looks like this:

clip_image001[19]

As you can see, the tuple mover has now compressed the ‘CLOSED’ rowgroup. If you do not want to wait, you could also run this statement:

alter index all on CCITest reorganize partition = 1

In part 2 of this post, I’m continuing the examples showing update operations and also showing how row groups are created per distribution (and partition).