Thursday Feb 04, 2016

Hadoop Compression. Compression rate. – Part1.

Compression codecs.

Text files (csv with “,” delimiter):

Codec Type  Average rate  Minimum rate  Maximum rate
bzip2 17.36 3.88 61.81
gzip 9.73 2.9 26.55
lz4 4.75 1.66 8.71
snappy 4.19 1.61 7.86
lzo 3.39 2 5.39

RC File: 

Codec Type Average rate Minimum rate Maximum rate
 bzip2 17.51 4.31 54.66
 gzip 13.59 3.71 44.07
 lz4 7.12 2 21.23
 snappy 6.02 2.04  15.38
 lzo 4.37 2.33 7.02

Parquet file:

Codec Type Average rate Minimum rate Maximum rate
 gzip 17.8 3.9 60.35
 snappy 12.92 2.63 45.99

[Read More]

Using Spark(Scala) and Oracle Big Data Lite VM for Barcode & QR Detection

Big Data and Scalable Image Processing and Analytics

Guest post by Dave Bayard - Oracle's Big Data Pursuit Team 

One of the promises of Big Data is its flexibility to work with large volumes of unstructured types of data such as images and photos. In todayís world, there are many sources of images including social media photos, security cameras, satellite images, and more. There are many kinds of image processing and analytics that are possible from optical character recognition (OCR), license plate detection, bar code detection, face recognition, geological analysis and more. And there are many open source libraries such as OpenCV, Tesseract, ZXing, and others that are available to leverage.

This blog and demonstration was built to show that scalable image analytics does not need to be difficult. Our primary goal in this blog is to use the Oracle Big Data Lite VM environment to demonstrate how to take an open source library and combine it into a Spark/Scala application. In particular, we will use Spark alongside the ZXing (Zebra Crossing) library to detect barcodes and QR Codes from a set of image files.

It should be also noted that instead of writing our own Spark application that we could instead have leveraged Oracleís Multimedia Analytics framework that comes as a feature of Oracle Big Data Spatial and Graph. For instance, the Multimedia Analytics framework could let us easily support videos as well as still images. In a later blog post, we will extend this example and show how to make it work with the Multimedia Analytics framework. For now you can learn more about the Multimedia Analytics framework here: http://docs.oracle.com/cd/E65728_01/doc.43/e67958/


This blog has the following objectives:

  • 1.Learn about the ZXing library for barcode and QR code detection.
  • 2.Learn how to build and run a simple Spark(Scala) application using the Oracle Big Data Lite VM.
  • 3.Learn how to build and run a Spark(Scala+Java) application using the ZXing libraries.




Above is an example of a photo containing a QR code. One potential application could include going through a large set of photos and identifying which photos have QR codes and the content of those QR codes. Not everyone will have a set of photos with QR codes in them nor will they have a need to scan them in bulk, but the concepts of this blog (showing how to use an open source image library alongside Spark and Scala) should still apply- just substitute QR code detection libraries with the image processing libraries of your choice.

Initial Oracle Big Data Lite VM Setup:

This demonstration uses the Oracle Big Data Lite VM version 4.3.0.1, which is available here: http://www.oracle.com/technetwork/database/bigdata-appliance/oracle-bigdatalite-2104726.html . Version 4.3.0.1 of the VM includes CDH 5.4.7, Spark 1.3.0, and Scala 2.10.4.

Once you have the Big Data Lite VM downloaded, imported, and running, then click on the ìRefresh Samplesî icon on the VM desktop to refresh the samples. At this point, you should fine the files needed for this blog are available under your /home/oracle/src/Blogs/SparkBarcode directory.



[Note: If you want to get access to the files referenced in this blog outside of the Oracle Big Data LiteVM, you can find them here: https://github.com/oracle/BigDataLite/Blogs/SparkBarcode ]

Now run the setup.sh script located at /home/oracle/src/Blogs/SparkBarcode. The setup.sh script will download the necessary files for the open source libraries ZXing and SBT as well as copy some sample image files into hdfs.



ZXing (Zebra Crossing) for Barcode Detection:

ZXing (pronounced ìZebra Crossingî) is an open source library that does one-dimensional (barcode) and two-dimensional (QR code) image detection. It is written in java. You can find out more at the website: https://github.com/zxing/zxing

As a quick example of ZXing, we can use the ZXing projectís hosted web application to demonstrate its functionality. In the Big Data Lite VM, open up the firefox browser and go to http://zxing.org/ . On the web page, click on the ìBrowseî button and use the file browser to open the /home/oracle/src/Blogs/SparkBarcode/images/test.jpg file. Then click the ìSubmit Queryî button on the web page.



The test.jpg file is a photo containing a 2-dimensional QR code. When you submit the image, the web application should return



The ZXing project has kindly made available the source code for the web application. To understand how to interact with the ZXing API, we can look at how the web application worked and focus on the source of the DecodeServlet class, which is available here: https://raw.githubusercontent.com/zxing/zxing/master/zxingorg/src/main/java/com/google/zxing/web/DecodeServlet.java

In particular, navigate to the processImage() method which illustrates how the ZXing APIs (such as MultipleBarcodeReader, MultiFormatReader, etc) are used:

  private static void processImage(BufferedImage image,
                                   HttpServletRequest request,
                                   HttpServletResponse response) throws IOException, ServletException {

    LuminanceSource source = new BufferedImageLuminanceSource(image);
    BinaryBitmap bitmap = new BinaryBitmap(new GlobalHistogramBinarizer(source));
    Collection results = new ArrayList<>(1);

    try {

      Reader reader = new MultiFormatReader();
      ReaderException savedException = null;
      try {
        // Look for multiple barcodes
        MultipleBarcodeReader multiReader = new GenericMultipleBarcodeReader(reader);
        Result[] theResults = multiReader.decodeMultiple(bitmap, HINTS);
        if (theResults != null) {
          results.addAll(Arrays.asList(theResults));
        }
      } catch (ReaderException re) {
        savedException = re;
      }
  
      if (results.isEmpty()) {
        try {
          // Look for pure barcode
          Result theResult = reader.decode(bitmap, HINTS_PURE);
          if (theResult != null) {
            results.add(theResult);
          }
        } catch (ReaderException re) {
          savedException = re;
        }
      }
  
      if (results.isEmpty()) {
        try {
          // Look for normal barcode in photo
          Result theResult = reader.decode(bitmap, HINTS);
          if (theResult != null) {
            results.add(theResult);
          }
        } catch (ReaderException re) {
          savedException = re;
        }
      }
  
      if (results.isEmpty()) {
        try {
          // Try again with other binarizer
          BinaryBitmap hybridBitmap = new BinaryBitmap(new HybridBinarizer(source));
          Result theResult = reader.decode(hybridBitmap, HINTS);
          if (theResult != null) {
            results.add(theResult);
          }
        } catch (ReaderException re) {
          savedException = re;
        }
      }
  
      if (results.isEmpty()) {
        try {
          throw savedException == null ? NotFoundException.getNotFoundInstance() : savedException;
        } catch (FormatException | ChecksumException e) {
          log.info(e.getMessage());
          errorResponse(request, response, "format");
        } catch (ReaderException e) { // Including NotFoundException
          log.info(e.getMessage());
          errorResponse(request, response, "notfound");
        }
        return;
      }

    } catch (RuntimeException re) {
      // Call out unexpected errors in the log clearly
      log.log(Level.WARNING, "Unexpected exception from library", re);
      throw new ServletException(re);
    }

    String fullParameter = request.getParameter("full");
    boolean minimalOutput = fullParameter != null && !Boolean.parseBoolean(fullParameter);
    if (minimalOutput) {
      response.setContentType(MediaType.PLAIN_TEXT_UTF_8.toString());
      response.setCharacterEncoding(StandardCharsets.UTF_8.name());
      try (Writer out = new OutputStreamWriter(response.getOutputStream(), StandardCharsets.UTF_8)) {
        for (Result result : results) {
          out.write(result.getText());
          out.write('\n');
        }
      }
    } else {
      request.setAttribute("results", results);
      request.getRequestDispatcher("decoderesult.jspx").forward(request, response);
    }
  }


We can observe that the code makes a series of attempts to identify barcodes/QR codes. If one attempt does not find a barcode, it attempts again using a different method or parameter/hint. This can help the code to better tolerate image detection challenges like variations in lighting, reflection, angle, resolution, image quality, etc. (If you are interested in other ways to use the ZXing APIs, an alternative example is the decode() method in the DecodeWorker class, which can be found here: https://github.com/zxing/zxing/blob/master/javase/src/main/java/com/google/zxing/client/j2se/DecodeWorker.java )

Our next step is to get started coding with ZXing by building a simple standalone Java application. Thanks to the setup.sh script you ran earlier, we have already downloaded the necessary ZXing java jar libraries, as described in the ZXing Getting Started guide at https://github.com/zxing/zxing/wiki/Getting-Started-Developing.

Our simple java standalone application will be based off the processImage() method from the web application. The source of the sample application can be found in the BarcodeDetector.java file, located in the /home/oracle/src/Blogs/SparkBarcode/SimpleJavaApp/barcodedemo subdirectory.

Essentially, we copied the web applicationís processImage() method and removed the dependencies on the http request and response objects. Explore the source code to see what we mean.

Now run the script ìrun_simple_java.shî to both compile and run our sample test program against the test image.



Our simple standalone java application using ZXing libraries is a success!

Spark Development: A First Scala Application

Our ultimate goal is to combine the open source ZXing library with Spark and run it using the resources of our CDH cluster. Specifically, we want to build a Scala Application that calls the java ZXing libraries using the Spark on Yarn Framework to run our barcode detection on a set of images in parallel. But before we attempt that final application, we will first start with a simple Scala application that uses only Javaís built-in libraries.

If you are new to Spark, you should check out http://spark.apache.org/docs/latest/quick-start.html . If you are new to the Scala language, you can find some quick language tips at http://www.artima.com/scalazine/articles/steps.html . This blog assumes you have some experience with working with basic Spark/Scala examples such as word count using the interactive spark-shell.

To help us with our Scala application development, we will want to add the ìsbtî utility to the Big Data Lite VM. SBT is frequently used to manage the build process for Scala applications, much like maven is used with Java applications. The ìsetup.shî script you ran earlier downloaded sbt for the Big Data Lite VM. If you want more information about SBT, you can navigate to here: http://www.scala-sbt.org/

Another requirement is to prepare a directory structure for our Spark/Scala application. We will follow the template directory structure described in http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications .

The directories and files for the simple application have been created for you and are located at the /home/oracle/src/Blogs/SparkBarcode/SimpleScalaApp subdirectory. They look likeÖ



There are 2 key files. The simple.sbt file is the build file that sbt uses and contains information such as dependencies on other libraries. The SimpleApp.scala file is the application source. The source looks like:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import java.awt.image.BufferedImage
import javax.imageio.ImageIO

object SimpleApp {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Scala Image App")
    val sc = new SparkContext(conf)
    
    val files = sc.binaryFiles("barcode/images/test.jpg")
    val imageResults = files.map(processSparkImage(_))
    imageResults.collect.foreach(
         result => println("\nFile:"+result._1+" Width:"+result._2+" Height:"+result._3)
                          ); //this println gets printed to the main stdout

    sc.stop()

    println("*")
    println("*")
    println("*")
    
  }
  
  def processSparkImage (
                    file: (String, org.apache.spark.input.PortableDataStream)
                   ) : (String, Int, Int)  =
  {
    println("In processSparkImage for "+file._1) //this println goes to the executor stdout
    val img: BufferedImage = ImageIO.read(file._2.open)
    val height = img.getHeight()
    println("Height is "+height+" for "+file._1)
    file._2.close

    return (file._1, img.getWidth(), img.getHeight)
  }
}
The simple application is based on the Spark quick startís example Scala standalone application- the big changes are that we have modified it to use Sparkís sc.binaryFiles() instead of sc.textFile(), and we have created a processSparkImage() function. Our processSparkImage() function uses standard Java Image APIs to extract the width and height of an image.

At the time of this writing, there does not seem to be much published about sc.binaryFiles(), so it is worth a little bit of extra explanation. The output of sc.binaryFiles is a set of tuples. The first element is the filename and the second element is a Spark PortableDataStream of the contents. In Scala notation, the body of processSparkImage() uses file._2 to point to the PortableDataStream and file._1 to point to the filename. The PortableDataStream can be used where you would use an InputStream.

The rest of the code is pretty standard stuff for initializing the SparkContext, etc.

Run the ìbuild_simple_scala.shî script. If this is your first time running the build script, be patient- it will take a dozen minutes or so as the sbt tool will do a one-time download of supporting libraries to prepare the scala environment for Spark application compilation/development.



Once the build is done, run the ìrun_simple_scala.shî script. Your output should look like below:



Notice that the simple application has printed out the Width and Height of the test image, as expected. Also notice that the run script has given you the URLs of the YARN, Hue, and Spark web UIs. In the terminal window, you can right click on those URLs and choose ìOpen LinkÖî to easily view them. Below are screenshots of the Hue, YARN Resource Manager, and Spark History web UIs:











Building our Scala+ZXing Spark Application:

Now, we can move onto integrating the java ZXing library into our Scala application code. We have done so in the application located at /home/oracle/src/Blogs/SparkBarcode/ScalaBarcodeApp.

Letís look at the code source directory.



Notice that there are both java and scala subdirectories. Under the java subdirectory, we have a barcodedemo.BarcodeProcessor class with our version of the processImage() function from our simple java application above.

Our scala code is BarcodeApp.scala. It looks like this:

/* BarcodeApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import barcodedemo.BarcodeProcessor

import java.awt.image.BufferedImage
import javax.imageio.ImageIO

object BarcodeApp {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Scala ZXing Barcode App")
    val sc = new SparkContext(conf)
    
    val files = sc.binaryFiles(args(0))

    val imageResults = files.map(processSparkImage(_))

    imageResults.collect.foreach(
               result => println("\nFile:" + result._1 + "\n Results:" + result._2)
                                ) 
                          //this println gets written to the main stdout


    sc.stop()

    println("*")
    println("*")
    println("*")
    
  }
  
  def processSparkImage (
                    file: (String, org.apache.spark.input.PortableDataStream)
                   ) : (String, String)  =
  {
    println("In processSparkImage for "+file._1) //this println goes to the executor stdout

    val img: BufferedImage = ImageIO.read(file._2.open)
    file._2.close

    return (file._1, BarcodeProcessor.processImage(img))
  }

}


Notice that the scala code imports our java BarcodeProcessor class. The Scala processSparkImage() method calls the java BarcodeProcessor.processImage() method, which returns the barcode information in a string.

You can also look at the main method for the Scala application. It defines an RDD using sc.binaryFiles() based on the path defined by the first command-line argument args(0). This will allow us to test our application with 1 or many images by changing the command-line arguments of our run command. Then the application calls the map() transformation for the processSparkImage() method. This will cause the Spark executors to run the processSparkImage() method on each binaryFile. Finally, the Scala code collects the results and prints the output for each file.

We can also look at the barcode.sbt file for this application and notice that weíve included a dependency for the necessary ZXing libraries from the central maven repository, telling SBT to go ahead and download them as needed for our build.

name := "Scala ZXing Barcode Application"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"

libraryDependencies += "com.google.zxing" % "core" % "3.2.1"
libraryDependencies += "com.google.zxing" % "javase" % "3.2.1"


Build our application by running ìbuild_barcode.shî.



Then run our application. If you want to run for the single test.jpg image, use ìrun_barcode.shî. If you want to test for a set of approx 25 images, then use ìrun_barcode_many.shî.

Here is the start, middle, and finish of ìrun_barcode_many.shî:













You will notice that a barcode was not detected in every image. In some cases, there simply wasnít a barcode or QR code in the image. In other cases, there was a barcode but the image might have been fuzzy or too much glare or something else.

Running on a real cluster:

To get a feel for the power of scalability, we can copy our Barcode application and deploy it onto real-world hardware. In our case, we will use an older 6-node Oracle Big Data Appliance X3-2. While the BDA X3-2 lacks the huge horsepower of the latest generation BDA X5-2, it can still give us a place to demonstrate scalability. In our example, we will run the ìrun_barcode_many.shî for 50 images against the Big Data Lite VM as well as against a 6-node X3-2 BDA.




The screenshot above shows details from running the Barcode app on the Big Data Lite VM on my laptop. It used 2 executors and ran in 1.3 minutes of clock time. The screenshot below shows details from running on the 6-node BDA X3-2. Notice the different number of Executors (in the Aggregated Metrics by Executor section). On the BDA, we used 50 executors and it took 5 seconds of clock time. Both scenarios used the same code and same set of images.



Notice how the Spark application was able to take advantage of the multiple machines and multiple CPUs of the BDA X3-2 to run in parallel and complete much faster (5seconds versus 1.3 minutes).

Moving Beyond:

Hopefully, this was a good start for your journey into Spark and Image Processing. Here are some possible future paths you could take:
  • You could learn more about Spark. To do so, I liked the book ìLearning Sparkî.
  • You could learn more about image detection and computer vision. To do so, I found the book ìSimpleCVî to explain the concepts well (SimpleCV is python-focused, but its explanation of concepts are useful to any language).
  • You could experiment with other libraries like OpenCV or Tesseract. Cloudera has a blog example using Spark with the open source Tesseract OCR library: http://blog.cloudera.com/blog/2015/10/how-to-index-scanned-pdfs-at-scale-using-fewer-than-50-lines-of-code/
  • You could experiment with tweaking the sample java processImage() code to work as a custom FrameProcessor for the Oracle Big Data Spatial and Graph Multimedia Analytics feature. This could be used, for instance, to scan videos for barcodes. To do so, you can use processImage() as part of an implementation of the oracle.ord.hadoop.mapreduce.OrdFrameProcessor class. See http://docs.oracle.com/cd/E65728_01/doc.43/e67958/GUID-4B15F058-BCE7-4A3C-A6B8-163DB2D4368B.htm or stay tuned for an upcoming blog/example.


NOTE: If you want to play around with the source files and make modifications, you should probably copy the SparkBarcode directory tree into a new directory outside of /home/oracle/src. This is because the ìRefresh Samplesî utility will wipe-out the /home/oracle/src directory every time it runs.

Conclusion:

This blog has shown you how to do Barcode detection using the ZXing libraries. First, we illustrated ZXing in action by using the hosted web application at zxing.org. Next, we built and ran a simple standalone java application using the ZXing APIs. Then, we built a simple Spark(Scala) application that used the built-in java Image APIs (but not yet the ZXing APIs). We ran this Spark(Scala) application against the YARN cluster on our Big Data Lite VM. Then we built our final Spark(Scala) application which included our custom Java code that called the ZXing APIs. We ran this on our Big Data Lite VM YARN cluster for a set of sample images. Finally, we took the same code and images and ran them on a physical Big Data Appliance to show the benefit of scale-out parallelism across nodes and cpus.

Hopefully, this has made you more comfortable with working with tools like the Oracle Big Data Lite VM, Spark, Scala, sbt, and ZXing. Enjoy.


About the Author:

David Bayard is a member of the Big Data Pursuit team for Oracle North America Sales.

Tuesday Jan 26, 2016

Big Data SQL Quick Start. Offloading - Part2.

After reading these articles: Big Data SQL Quick Start. Introduction, One fast Query All your Data, you know what Big Data SQL is, and you understand that it allows you query data from Hadoop through the Oracle Database.  But you also should to know that it’s not just reading data. Big Data SQL allows you to process data stored in HDFS locally and return back to the database only data relevant to the query. Let’s imagine a simple diagram of a data management system that includes Oracle Database and Hadoop:



And now let’s list some of the advantages of the each tier:

1) Databases are good for:

i. Transactional workload
ii. Concurrency reads/writes
iii. Store critical data
iv. Multiple joins, complex queries

  2) Hadoop is good for:
a. Batch noncurrent workload
b. Relatively simple query
c. Horizontal scalability for this simple queries
d. Unstructured and semi structured type of data


This list suggests to us that it would be nice to have system that could store “raw” data on the Hadoop tier, accomplish all the rudimentary processing jobs, and return back only prepared, clean data on the database tier. So, Big Data SQL actually let’s you accomplish this simply. On the Hadoop tier it performs:
- Deserialization of the data. If you store data in some serialized format like AVRO, JSON, or Parquet Big Data SQL will deserialize it on the Hadoop tier
- Column pruning. If your table has 400 columns and in the query you seelct only 1 column, Big Data SQL will prune 399 columns and return back only 1.


- Datatype transformation. Oracle Database has own datatype, and datatype conversion is not cheap operation. On the Hadoop layer, Big Data SQL transforms data to the Oracle Database format
- Apply functions. If you have filter predicates (like “id=…”) it’s good have that work done on the Hadoop tier. You can list all of the functions that can be done on the Hadoop tier by running this query in Oracle:

SQL> SELECT * FROM v$sqlfn_metadata WHERE offloadable = 'YES'

From here on out, I’ll call queries for those most of the workload done on the Hadoop tier like “OFFLOADABLE”. Now, let’s consider the example from my previous blogpost (Big Data SQL Quick Start. Introduction):

SQL> SELECT min(w.ws_sold_time_sk)
FROM WEB_SALES w
WHERE w.ws_sold_date_sk = 2451047

Filtering is main part of this query from Big Data SQL perspective; it will sift out inappropriate rows and pass on the RDBMS side only eligible rows. Also, Big Data SQL will perform column pruning.  That means that from all columns in this table, it will pass only one that we are going to calculate. Let’s check execution plan and query statistics in OEM:



Here in the Oracle Enterprise Manager we also could check offload efficiency: 


As an alternative to OEM, you can check execution statistics with following query (run it after user query in your favorite SQL tool, like sqldeveloper or SQL*Plus):

SQL> SELECT n.name,
round(s.value/1024/1024)
FROM v$mystat s,
v$statname n
WHERE s.statistic# IN (462,463)
AND s.statistic# = n.statistic#;


cell XT granule bytes requested for predicate offload 32768
cell interconnect bytes returned by XT smart scan 32

This is classic example of a good query for Big Data SQL.

How to determine that offloading didn’t happen?

Before I answer this question, I’ll explain in more detail how offloading works. The picture below will help make things clear:

In the first step Big Data SQL applies storage indexes, reads data from HDFS, and performs deserialization. Actually “External Table Service” has two modes for reading data: C or Java. You may specify it during table creation in access parameter settings:


ACCESS PARAMETERS
( com.oracle.bigdata.datamode=java
)
or

ACCESS PARAMETERS
( com.oracle.bigdata.datamode=c
)

Use C mode when it possible (not all dataformats are available for c mode). In both cases, Big Data SQL reads an HDFS block, translates it to the Oracle format (which is needed for Smart Scan subcomponent) and passes the data to Smart scan it there. On the Smart Scan level, Big Data SQL performs column pruning, filtering, parses json, xml and so on. Resulting data is pushed to the Database level over the network in native Oracle database format.
How do you figure out if your query is offloadable or not? The first step to figuring this out this is to open SQL monitoring in Oracle Enterprise Manager (this is true for debugging of all Oracle queries) and have a look at the events.
Let’s start with and example of “proper” (offloadable) query (I name proper case when we perform bigger part of the workload on the storage side). I implemented a senseless PL/SQL function, which is actually doing nothing:

SQL> create or replace function fnull(input number) return number is
Result number;
begin
Result:=input;
return(Result);
end fnull;


And then run following SQL:

SQL> SELECT fnull(MIN(WS_SOLD_DATE_SK)),
………
fnull(MIN(WS_NET_PROFIT))
FROM WEB_SALES

The table WEB_SALES has many columns (34 total) and in my SQL I list all of them (but show only two in this blog). The function min is offloadable, which means that we could execute it on the Hadoop side:

SQL> SELECT NAME,
offloadable,
AGGREGATE
FROM v$sqlfn_metadata
WHERE NAME = 'MIN'
AND AGGREGATE='YES'


NAME offloadable AGGREGATE

---- ----------- ---------

MIN YES         YES


Then, we will get this filtered by Big Data SQL, the result will be transmitted to the db side and then our PL/SQL function will be applied on the database tier.
Let’s inspect what OEM shows us:



87% of the events are “user IO” and all of this IO is the cell side (“cell external table smart scan”). 13% of the wait events are CPU waits (most probably it’s PL/SQL execution waits). Also, will be useful check offloading statistics. We could do this in OEM: 


Alternatively, we could run following query in SQL developer, SQL*Plus or in other SQL tools:

SQL> SELECT n.name,
round(s.value/1024/1024)
FROM v$mystat s, v$statname n
WHERE s.statistic# IN (462,463)
AND s.statistic# = n.statistic#;

This example shows offloadable query, which means it’s good for Big Data SQL. In contrast, let’s consider anti-pattern of the query, for example we could reverse our functions like this:

SQL> SELECT MIN(fnull(WS_SOLD_DATE_SK)),
……
MIN(fnull(WS_NET_PROFIT))
FROM WEB_SALES

It looks very similar to the previous query, and returns the same result, but… execution  will be different. As soon as we wrap a column in PL/SQL (PL/SQL could not be performed on the Hadoop tier), we have to move all data on the database tier. OEM also shows different wait events:


You can observe only CPU waits on the screen above. We can click on the query details and see that smart scan returns all eligible bytes:

Avoid this case if possible,  and try to perform as much of the query as possible on the Hadoop tier. The Hadoop tier is performs some work in this query - it transform HDFS block to the database format and for queries where not all list of the columns are listed it prune unused columns. It’s just not doing all it can.

Database’s side datatype transformation

Ok, but what if Big Data SQL agents on some node(s) become unavailable? Let’s look at this case! We start by stopping Big Data SQL (via Cloudera Manager):



After this run any query. Here’s a simple one:

SQL> SELECT COUNT(1)
FROM STORE_SALES ss
WHERE ss.ss_sold_date_sk = 2452200

The first sign that offloading isn’t happening, will be slow query performance. Then, in OEM, you will find many “Application: External Procedure Call” events. Here is screen from Enterprise Manager:


Also you can check network utilization on the database side (it will be significant):

Restrictions for the Big Data SQL.

Here is a brief set of restrictions:

1) Big Data SQL is full Oracle Database SQL. All details you could find here: https://docs.oracle.com/database/121/SQLRF/toc.htm

2) Not all functions can be done on the Hadoop tier. You can find the list of the functions that can be offloaded with the following view:

 SELECT NAME FROM v$sqlfn_metadata WHERE offloadable ='YES'

3) Even for non-offloadable operations Big Data SQL will perform column pruning and datatype conversion (which saves a lot of resources)

4) Other operations (non-offloadable) will be done on the database side

Monday Jan 25, 2016

Parallel Correlated Filters and Expressions in 12c

In the last post we looked at multiple parallelizers and how they could cause a SQL statement use more PX servers than expected. You will notice that some types of statements behave differently in 12c compared to 11g in this regard. For these statements 12c does not use multiple parallelizers, thereby avoiding the negative consequences of multiple parallelizers.

Correlated filters

Here is a simple query that checks if the sales table has any rows without a corresponding channel_id in the channels table.

SELECT /*+ parallel(2) monitor */ COUNT(*)
FROM sales s
WHERE NOT EXISTS
  (SELECT /*+ no_unnest */ 1 
  FROM channels c 
  WHERE c.channel_id=s.channel_id);

I have used the no_unnest hint on purpose for illustration purposes to force the optimizer to use a filter operation rather than unnesting the query and using a join between two tables.

First let's look at the 11g plan for this statement. Note that this plan is from a 12c database with optimizer_features_enable set to 11.2.0.4 in the session. You can click on the pictures to see them larger.

We see there are two parallelizers (Line Ids 3 and 7), each with one PX server set, totally 2 PX server sets are used for this statement. The first PX server set scans the sales table and sends the rows to the PX coordinator (Line Ids 4-6). The PX coordinator executes the subquery by using another PX server set (Line Ids 8-10), but does the filtering itself serially (Line Id 2). If we look at the "Activity %" column we see that nearly all the time was spent on Line Ids 2, 3, and 4 as the filter operation is done serially by the PX coordinator. In Line Id 4 we spent time sending 102M rows (indicated by the Actual Rows column) to the PX coordinator, in Line Ids 2 and 3 we spent time getting these rows and running the filter operation.

If we run the same statement with optimizer_features_enable set to 12.1.0.2 to get the 12c behavior here is what we see.

In 12c we do not see two parallelizers anymore, we have only one parallelizer (Line Id 2) which has only one PX server set. The PX servers in this set scan the sales table (Line Ids 6, 7), then each PX server scans the channels table and does the filter operation for the rows it scanned from the sales table (Line Ids 5, 8). After the filter operation each PX server counts the rows (Line Id 4) and sends the result to the PX coordinator. If we look at the Timeline column for these two cases we see that the same query finished in 67 seconds in 11g and in 31 seconds in 12c.

You can see the SQL Monitor reports for these two examples here, 11g and 12c. You may see a blank page if you try to open these links if your browser is configured to reject unsafe scripts, this is because this blog site is using HTTPS and SQL Monitor reports use HTTP links, please download the reports and open them locally in this case.

Correlated expressions

The second type of SQL construct that behaves differently with regard to multiple parallelizers in 12c is scalar subquery correlated expressions used in select lists.

Here is a simple query that uses this type of a subquery in the select list.

SELECT /*+ parallel(2) */
     (SELECT c.channel_desc 
     FROM channels c 
     WHERE c.channel_id=s.channel_id) channel_desc
FROM sales s
WHERE cust_id<100000;

Here is the plan for this query with optimizer_features_enable set to 11.2.0.4.

We have two parallelizers (Line Ids 2, 5) and one PX server set under each parallelizer. One PX server set (Line Ids 6-8) scan the sales table and sends the rows to the PX coordinator. The PX coordinator uses another PX server set (Line Ids 2-4) to scan the channels table for the subquery.

Here is the 12c plan for the same statement.

Now we have only one parallelizer (Line Id 1) and one PX server set (Line Ids 2-6). The PX servers in this single set scan the sales table, each PX server scans the channels table for the subquery. 12c introduces a new operation called "Expression Evaluation" (Line Id 3) which indicates the subquery expression is evaluated by each PX server after scanning the sales table.

The elapsed times for this query in 11g and 12c are similar as you can see in the Timeline column.

You can see the SQL Monitor reports for these two examples here, 11g and 12c. Again, please download the files and open them locally if you see a blank page when you open the link.

In all examples here you will notice that the number of executions for the subquery does not match the number of rows from the sales table, this is because of scalar subquery caching. Oracle tries to cache the result of the subquery so that it will not need to run the subquery again for the same values from the sales table. This is also why we see the scan of the channels table is not active for the whole duration of execution (indicated by the Timeline column).

In all examples in this post I have used the Swingbench SH schema with all indexes disabled for illustration purposes.

For correlated filters and scalar subquery expressions in the select list 12c provides improvements so that statements do not use multiple parallelizers. This prevents such statements from the implications of multiple parallelizers we mentioned in the last post. Note that you can still see cases where these optimizations do not kick in depending on the complexity of the statement.

Tuesday Jan 19, 2016

Big Data SQL Quick Start. Introduction - Part1.

Today I am going to explain steps that required to start working with Big Data SQL. It’s really easy!  I hope after this article you all will agree with me. First, if you want to get caught up on what Big Data SQL is, I recommend that you read these blogs: Oracle Big Data SQL: One Fast Query, Big Data SQL 2.0 - Now Available.

The above blogs cover design goals of Big Data SQL. One of the goals of Big Data SQL is transparency. You just define table that links to some directory in HDFS or some table in HCatalog and continue working with it like with general Oracle Database table.It’s also useful to read the product documentation.

Your first query with Big Data SQL

Let’s start with simplest one example and query data that is actually stored in HDFS via Oracle Database using Big Data SQL. I’m going to begin this example by checking of the data that actually lies into HDFS. To accomplish this, I run the hive console and check hive table DDL:

hive> show create table web_sales;

OK

CREATE EXTERNAL TABLE web_sales(

  ws_sold_date_sk int,

 ws_sold_time_sk int,

....

  ws_net_paid_inc_ship float,

  ws_net_paid_inc_ship_tax float,

  ws_net_profit float)

ROW FORMAT DELIMITED

  FIELDS TERMINATED BY '|'

STORED AS INPUTFORMAT

  'org.apache.hadoop.mapred.TextInputFormat'

OUTPUTFORMAT

  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

LOCATION

  'hdfs://democluster-ns/user/hive/warehouse/tpc_ds_3T/web_sales'

From the DDL statement, we can see the data is text files (CSV), stored on HDFS in the directory:

/user/hive/warehouse/tpc_ds_3T/web_sales

From the DDL statement we can conclude that fields terminated by “|”. Trust, but verify – let’s check:

# hdfs dfs -ls /user/hive/warehouse/tpc_ds_3T/web_sales|tail -2

... hive 33400655 2015-05-11 13:00 /user/hive/warehouse/tpc_ds_3T/web_sales/part-01923

... hive 32787672 2015-05-11 13:00 /user/hive/warehouse/tpc_ds_3T/web_sales/part-01924

# hdfs dfs -cat /user/hive/warehouse/tpc_ds_3T/web_sales/part-01923|tail -2

2451126|36400|2451202|302374|9455484|1765279|2274|6715269|2004559|472683|5807|

2451126|36400|2451195|289906|9455484|1765279|2274|6715269|2004559|472683|5807|

Indeed, we have CSV files on HDFS. Let’s fetch it from the database.

New type of External table, new events and new item in the query plan

With Big Data SQL we introduce new types of External Tables (ORACLE_HIVE and ORACLE_HDFS), a new wait event (cell external table smart scan), and a new plan statement (External Table Access Storage Full). Over this HDFS directory, I’ve defined and Oracle External table, like this:

CREATE TABLE WEB_SALES_EXT (

SS_SOLD_DATE_SK NUMBER,

SS_NET_PROFIT NUMBER

)

ORGANIZATION EXTERNAL

( TYPE ORACLE_HIVE

DEFAULT DIRECTORY "DEFAULT_DIR"

ACCESS PARAMETERS

( com.oracle.bigdata.cluster=democluster

  com.oracle.bigdata.tablename=web_sales)

)

REJECT LIMIT UNLIMITED

PARALLEL;

After table creation, I’m able to query data from the database. To begin, I run a very simple query that calculates the minimum value of some column, and has a filter on it.  Then, I can Oracle Enterprise Manager to determine how my query was processed:

SELECT min(w.ws_sold_time_sk) 

FROM WEB_SALES w

WHERE w.ws_sold_date_sk = 2451047

We can see the new type of the wait event “cell external table smart scan”:

and new item in plan statement - “external table access storage full”:

To make sure that your table now exists in Oracle dictionary you can run follow queries:

SQL> SELECT t.OBJECT_NAME,t.OBJECT_TYPE

FROM user_objects t 

WHERE

object_name='WEB_RETURNS';

/

OBJECT_NAME OBJECT_TYPE

----------- -------------

WEB_RETURNS TABLE

Big Data SQL also adds a new member to Oracle’s metadata - ALL_HIVE_TABLES:

SQL> SELECT table_name,LOCATION,table_type

FROM ALL_HIVE_TABLES 

WHERE

TABLE_NAME='web_returns';

/

TABLE_NAME LOCATION                                 TABLE_TYPE

----------- -------------------------------------- -----------

web_returns hdfs://democluster-ns/.../web_returns EXTERNAL_TABLE

See, querying Hadoop with Oracle is easy! In my next blog posts, we’ll look at more complicated queries!

Wednesday Jan 13, 2016

BIWA 2016 - here's my list of must-attend sessions and labs

It’s almost here - the 2016 BIWA conference at the Oracle Conference Center. The conference starts on January 26 with a welcome by the conference leaders at 8:30am. The BIWA summit is dedicated to providing all the very latest information and best practices for data warehousing, big data, spatial analytics and BI. This year the conference has expanded to include the most important query language on the planet: SQL. There will be a whole track dedicated to YesSQL! The full agenda is available here

Unfortunately I won’t be able to attend this year’s conference but if I was going to be there, then this would be my list of must-attend sessions and hands-on labs.

[Read More]

Thursday Jan 07, 2016

Data loading into HDFS - Part1

Today I’m going to start first article that will be devoted by very important topic in Hadoop world – data loading into HDFS. Before all, let me explain different approaches of loading and processing data in different IT systems.

Schema on Read vs Schema on Write

So, when we talking about data loading, usually we do this into system that could belong on one of two types.  One of this is schema on write. With this approach we have to define columns, data formats and so on. During the reading  every user will observe the same data set. As soon as we performed ETL (transform data in format that mostly convenient to some particular system), reading will be pretty fast and overall system performance will be pretty good. But you should keep in mind, that we already paid penalty for this when were loading data. Like example of schema on write system you could consider Relational data base, for example, like Oracle or MySQL.


Schema on Write

Another approach is schema on read. In this case we load data as-is without any changing and transformations.  With this approach we skip ETL (don’t transform data) step and we don’t have any headaches with data format and data structure. Just load file on file system, like coping photos from FlashCard or external storage to your laptop’s disk. How to interpret data you will decide during the data reading. Interesting stuff that the same data (same files) could be read in different manner. For instance, if you have some binary data and you have to define Serialization/Deserialization framework and using it within your select, you will have some structure data, otherwise you will get set of the bytes. Another example, even if you have simplest CSV files you could read the same column like a Numeric or like a String. It will affect on different results for sorting or comparison operations.

Schema on Read

Hadoop Distributed File System is classical example of schema on read system.More details about Schema on Read and Schema on Write approach you could findhere. Now we are going to talk about data loading data into HDFS. I hope after explanation above, you understand that data loading into Hadoop is not equal of ETL (data doesn’t transform).

[Read More]

Thursday Dec 24, 2015

Oracle Big Data Lite 4.3.0 is Now Available on OTN

Big Data Lite 4.3.0 is now available on OTN



This latest release is packed with new features - here's the inventory of what's included:

  • Oracle Enterprise Linux 6.7
  • Oracle Database 12c Release 1 Enterprise Edition (12.1.0.2) - including Oracle Big Data SQL-enabled external tables, Oracle Multitenant, Oracle Advanced Analytics, Oracle OLAP, Oracle Partitioning, Oracle Spatial and Graph, and more.
  • Cloudera Distribution including Apache Hadoop (CDH5.4.7)
  • Cloudera Manager (5.4.7)
  • Oracle Big Data Spatial and Graph 1.1
  • Oracle Big Data Discovery 1.1.1
  • Oracle Big Data Connectors 4.3
    • Oracle SQL Connector for HDFS 3.4.0
    • Oracle Loader for Hadoop 3.5.0
    • Oracle Data Integrator 12c
    • Oracle R Advanced Analytics for Hadoop 2.5.1
    • Oracle XQuery for Hadoop 4.2.1
  • Oracle NoSQL Database Enterprise Edition 12cR1 (3.4.7)
  • Oracle Table Access for Hadoop and Spark 1.0
  • Oracle JDeveloper 12c (12.1.3)
  • Oracle SQL Developer and Data Modeler 4.1.2 with Oracle REST Data Services 3.0
  • Oracle Data Integrator 12cR1 (12.2.1)
  • Oracle GoldenGate 12c
  • Oracle R Distribution 3.2.0
  • Oracle Perfect Balance 2.5.0
Also, this release is using github as the repository for all of our sample code (https://github.com/oracle/BigDataLite).  This gives us a great mechanism for updating the samples/demos between releases.  Users simply double click the "Refresh Samples" icon on the desktop to download the latest collateral.

Friday Dec 18, 2015

Oracle among vendors that lead the pack in The Forrester Wave™: Enterprise Data Warehouse, Q4 2015, achieving highest scores for current product offering and product strategy

According to Forrester, a leading independent research firm, EDWs are now evolving beyond traditional storage and delivery. It is now scale, performance, and innovation that distinguish the EDW leaders. Oracle was top ranked by Forrester in the current offering category. Furthermore, Oracle was top ranked in the strategy category.  
[Read More]

Monday Dec 07, 2015

Multiple Parallelizers

The number of PX servers that a SQL statement needs depend on the execution plan and the degree of parallelism (DOP) of the statement. Most statements use DOP*2 number of PX servers as a result of the producer/consumer model used in parallel execution.

We sometimes get questions from users about statements using many more PX servers than they expect. They say "I have a statement with a DOP of 8 and I see that it uses 64 processes". These types of statements typically involve multiple parallelizers. We have looked at how you can identify such statements in the previous post, that post also covers basic terminology so please read that post before going into this.

In this post we will look at a few example cases that can generate plans with multiple parallelizers and how those plans behave at runtime. We will also talk about the implications of having such statements in your system. The examples used here are based on 12c. Ideally a user should be able to control the number of PX servers for a statement, this is not the case when you have multiple parallelizers. We are trying to minimize cases of multiple parallelizers but I wanted to explain what the current behavior is.

Multiple nonconcurrent parallelizers

The typical case where you can see multiple parallelizers is temp table transformation. With temp table transformation Oracle creates temporary tables at runtime, stores intermediate results in those tables and queries them during the execution of the statement. The optimizer can decide to use temp table transformation with subquery factoring a.k.a. WITH clause, grouping sets, star queries, and in-memory aggregation.

Let's look at a simple query with grouping sets as an example to understand how multiple parallelizers work.

SELECT /*+ parallel(2) */ channel_desc, calendar_month_desc, SUM(amount_sold)
FROM sales, times, channels
WHERE sales.time_id=times.time_id 
AND sales.channel_id= channels.channel_id
GROUP BY GROUPING SETS(channel_desc, calendar_month_desc);

This plan has 4 parallelizers (Line Ids 2, 11, 20, 29). The parallelizers #1 and #4 have one DFO each which means each of them needs one PX server set (2 PX servers as the DOP is 2). The parallelizers #2 and #3 have 2 DFOs each which means they need 2 PX server sets (4 PX servers as the DOP is 2). The important question here is, will these parallelizers execute concurrently? If they are concurrent this statement will use 12 PX servers, if they are not concurrent the statement can run with fewer PX servers.

In the case of temp table transformation parallelizers do not execute concurrently, they run one after the other. Here is a timeline of execution for this plan.

T0: First parallelizer #1 starts and allocates 2 PX servers, it joins three tables and loads the result set into a temporary table (SYS_TEMP_0FD9D6719_E74332).

T1: When this parallelizer finishes it releases the PX servers back to the pool. These PX servers are now available for anyone to use.

T2: Now parallelizer #2 can start, it allocates 4 PX servers from the pool, these may be the same ones released by the previous parallelizer or they may be different PX servers from the pool. This parallelizer reads from the temporary table created previously, does the group by operation and loads the results into a new temporary table (SYS_TEMP_0FD9D671A_E74332).

T3: Now it releases the PX servers back to the pool and these PX servers become available in the system.

T4, T5: Parallelizer #3 does the same thing, it allocates 4 PX servers and releases them when it finishes.

T6, T7: Now the last parallelizer (#4) starts, it allocates 2 PX servers, reads from the temporary table created previously (SYS_TEMP_0FD9D671A_E74332), sends the results to the user and releases the PX servers when it finishes.

This sequence of execution shows that the maximum number of PX servers used concurrently was 4, not more. In this case we used DOP*2 number of PX servers concurrently even though we had multiple parallelizers. This is because a parallelizer allocates PX servers only when it starts and releases them when it finishes. A parallelizer is started only when it is needed in the runtime execution of the plan.

Here is what v$pq_sesstat shows after running this query.

STATISTIC                      LAST_QUERY SESSION_TOTAL     CON_ID
------------------------------ ---------- ------------- ----------
Queries Parallelized                    1             1          0
DML Parallelized                        0             4          0
DDL Parallelized                        0             0          0
DFO Trees                               1             5          0
Server Threads                          2             0          0
Allocation Height                       2             0          0
Allocation Width                        1             0          0
Local Msgs Sent                        86          3113          0
Distr Msgs Sent                         0             0          0
Local Msgs Recv'd                      90          3123          0
Distr Msgs Recv'd                       0             0          0
DOP                                     2             0          0
Slave Sets                              1             0          0

Even though we used 4 PX servers concurrently it shows we used 2 (Servers Threads), this is because v$pq_sesstat only shows stats from the last active parallelizer which was Line Id 32 in this case.

To find the actual number of PX servers used concurrently you need to look at v$px_session when the statement is running. Here is what that view shows when the statement is running and the second parallelizer is active (my session's SID which is also the QC SID was 26 in this case).

select qcsid,sid,server_group,server_set,server#
from v$px_session where qcsid=26

     QCSID        SID SERVER_GROUP SERVER_SET    SERVER#
---------- ---------- ------------ ---------- ----------
        26        791            1          1          1 <--- Server Set #1, PX Server #1
        26         14            1          1          2 <--- Server Set #1, PX Server #2
        26        786            1          2          1 <--- Server Set #2, PX Server #1
        26         33            1          2          2 <--- Server Set #2, PX Server #2
        26         26                                    <--- Query Coordinator

This shows that at the time I looked at v$px_session there were 5 sessions working for my query, one was the QC (sid=26), the others were 4 PX server sessions. There was one parallelizer active (server_group=1) and it had 2 PX server sets (server_set) and 4 PX servers (server#). As the statement proceeds you will see that this view will show the active parallelizers and PX servers.

SQL Monitor also nicely shows which parallelizers are active by showing the active operations in the plan.

In this screenshot you can see that only the first parallelizer is active at this time.

Multiple concurrent parallelizers

A typical case where multiple parallelizers run concurrently is noncorrelated subqueries. Here is an example showing this case.

SELECT /*+ parallel(2) */ DISTINCT prod_id
FROM sales
WHERE amount_sold >
  (SELECT AVG(amount_sold) FROM sales)
AND quantity_sold >
  (SELECT AVG(quantity_sold) FROM sales)  ;

We have 3 parallelizers in this plan, Line Ids 1, 10, and 16. Here is a timeline of the execution for this plan.

T0: This plan starts with parallelizer #1, since this parallelizer needs 2 PX server sets it allocates 4 PX servers as the DOP is 2. This parallelizer requires the outputs of the filter subqueries so now the other parallelizers will be started.

T1: While still keeping the initial 4 PX servers allocated we now start the second parallelizer (#2), this parallelizer only needs one PX server set, so it allocates 2 PX servers. We now have a total of 6 PX servers concurrently allocated for this query.

T2: When the first subquery finishes running this second parallelizer is now finished, so it releases 2 PX servers it allocated.

T3, T4: Now the third parallelizer (#3) starts, allocates 2 PX servers and releases them when it finishes.

The first parallelizer now has the results of both subqueries and can continue scanning and filtering the sales table. So, this query starts at most 2 parallelizers concurrently and uses 6 PX servers concurrently at any point.

UPDATE, 14 Dec 2015: The actual order of allocating PX servers is parallelizer #1, #3, and #2. Please see the comments for this correction.

If we look at v$pq_sesstat after this query ends it reports 8 PX servers (Server Threads), not 6. This is because this view does not show the number of PX servers used concurrently, but shows the accumulated number of PX servers during the execution even if the same PX servers were released and allocated.

STATISTIC                      LAST_QUERY SESSION_TOTAL     CON_ID
------------------------------ ---------- ------------- ----------
Queries Parallelized                    1             3          0
DML Parallelized                        0             0          0
DDL Parallelized                        0             0          0
DFO Trees                               3             7          0
Server Threads                          8             0          0
Allocation Height                       2             0          0
Allocation Width                        1             0          0
Local Msgs Sent                       120           296          0
Distr Msgs Sent                         0             0          0
Local Msgs Recv'd                     120           296          0
Distr Msgs Recv'd                       0             0          0
DOP                                     2             0          0
Slave Sets                              4             0          0

Again, if you want to find out the number of PX servers allocated concurrently for statements with multiple parallelizers use the view v$px_session instead.

Implications of multiple parallelizers

Downgrades

A statement with a single parallelizer allocates the required number of PX servers at the start and uses them without releasing until it finishes. So the number of PX servers throughout the execution is constant. Statements with multiple parallelizers are different as we saw in the above examples, they allocate PX servers when each parallelizer starts. Since parallelizers can start at different times during the execution each parallelizer may be running with a different number of PX servers based on the number of available processes in the system. Basically the rules about DOP downgrades I talked before apply to each parallelizer individually.

Consider the sequence of execution above. When parallelizer #1 starts it will try to allocate 2 PX servers. Assuming there are enough available PX servers in the system it will get those processes. When it finishes and releases them parallelizer #2 will start and try to allocate 4 PX servers. If at this point there are no available PX servers in the system this parallelizer will run serially. The same is true for the subsequent parallelizers.

Parallel Statement Queuing

Parallel Statement Queuing decides to queue or run a parallel statement based on its DOP and the number of available PX servers. It assumes no statement will use more than 2 PX server sets (thus DOP*2 number of PX servers). Consider the case where the DOP is 8 and there are 16 PX servers available below parallel_servers_target. Oracle will allow this query to run, but if this statement uses multiple concurrent parallelizers and starts using 24 PX servers the queuing point will be exceeded. When a statement is picked from the queue and allowed to run it is free to allocate any number of PX servers it needs. Depending on the number of these kinds of statements running at the same time all PX servers in the system can be consumed. This effectively eliminates the benefit of queuing and statements may start getting downgraded because of PX server shortage in the system. So, if you are using Parallel Statement Queuing and if you see that parallel_servers_target is exceeded look for statements with multiple parallelizers as possible suspects. We are working to fix this behavior in future releases. Until then make sure there is enough gap between parallel_servers_target and parallel_max_servers to prevent downgrades.

Database Resource Manager DOP limits

We have had some users setting DOP limits using Database Resource Manager (DBRM) and expecting all statements to be limited to DOP*2 number of PX servers. As of today that expectation is not true for statements with multiple concurrent parallelizers. For example even if DBRM limits the DOP to 2, a statement with a DOP of 2 can use 6 PX servers concurrently as we saw in example #2.

Summary

Here is a short summary of what we have discussed in this post.

1. Multiple parallelizers can run concurrently or nonconcurrently.

2. Each parallelizer allocates PX servers when it starts, the number of PX servers allocated by each parallelizer depends on the number of PX server sets and can be at most DOP*2.

3. v$pq_sesstat may show incorrect information depending on the concurrency of multiple parallelizers, watch the behavior at runtime instead.

4. Each parallelizer may or may not get the required number of PX servers depending on the number of available PX servers in the system.

5. Parallel Statement Queuing assumes each statement will use DOP*2 number of PX servers, this can cause statements with multiple parallelizers to be allowed to run and exceed the queuing point (parallel_servers_target). To prevent downgrades in this case make sure there is enough gap between parallel_servers_target and parallel_max_servers.

I am planning to cover Parallel Statement Queuing in detail in a future post, we will come back to this topic in there, so please come back for that one too.

In each release we are trying to decrease the number of cases that can generate plans with multiple parallelizers. If you are on 11g you will see that some plans with multiple parallelizers will switch to using a single parallelizer when you upgrade to 12c. In the next post I will talk about those cases and compare 11g to 12c in this regard.

Thursday Dec 03, 2015

Looking forward to #ukoug_tech15 conference

Next week I will be in Birmingham at the annual UKOUG Tech conference. I will be presenting on some of the new SQL features that we added in Database 12c to support analysis of big data......to give you a taster of what to expect during my session here is a sort-of relevant Dilbert cartoon, courtesy of Scott Adams

Dilbert-Approximate-Answers


 

[Read More]

Tuesday Dec 01, 2015

My highlights from DOAG 2015...

Last week I was in Nuremburg at the excellent annual German Oracle user group conference - DOAG15. This was my first visit to DOAG and I was amazed at the size of the event. It is huge but most importantly it is very well organized and definitely one of the best, possibly the best, user conference that I have attended. 

..and then there was the chance to drink some gluhwein with the database PM team on the way to the speaker dinner.

Nuremburg

[Read More]

Tuesday Nov 24, 2015

Little things to know about ... Oracle Partitioning (part one of hopefully many)

Oracle Partitioning is one of the most commonly used option on top of Enterprise Edition - if not the most often used one, which is as you can guess always a discussion in our buildings ;-)

Over the years Oracle Partitioning matured significantly and became more powerful and flexible. But, as in real life, with power and flexibility comes always little things that are good to know (no, that's not an equivalent for complexity). So I am happy to see Connor McDonald just blogging about such a little detail around Interval Partitioning.  

Check it out, it's worth it.

Monday Nov 23, 2015

PX Server Sets, Parallelizers, DFO Trees, Parallel Groups, ... Let's Talk About Terminology

In my last post I mentioned cases where a SQL statement uses more PX servers than expected, I said "There are rare cases when a statement uses more than DOP*2 number of PX servers depending on the plan shape.". I started this post to talk about those cases but then I thought maybe we should clarify the terminology before doing that. So let's go over the basic parallel execution terminology we use in the documentation and in the monitoring tools (V$ views, SQL Monitor, etc...).

DFO (Data Flow Operation), PX Server Set

A DFO is the basic unit of work carried out by PX servers in an execution plan. The set of PX servers working on a DFO is called a PX server set. The number of PX servers in a set is determined by the statement DOP.

Consider the following example.

SQL> explain plan for select /*+ parallel(2) */ * from sales;

Explained.

SQL> select * from table(dbms_xplan.display);

As a general rule DFO boundaries are indicated by two columns in an execution plan; the lines having PX SEND in the Operation column or TQ (Table Queue) in the Name column indicate DFO boundaries. In this plan we see that we have only one DFO, indicated by Line Id 2. This means this statement will use one PX server set having 2 PX servers as the DOP is 2.

Only very basic statements can be executed with a single DFO, most statements are executed by multiple DFOs. The following plan shows 2 DFOs which means 2 PX server sets.

SQL> explain plan for 
  2  select /*+ parallel(2) */ cust_id,count(*)
  3  from sales
  4  group by cust_id;

Explained.

SQL> select * from table(dbms_xplan.display);

We can say this again by looking at the PX SEND operations, Line Id 2 and 5. Since we have 2 DFOs this statement will use 2 PX server sets, each set will have 2 PX servers as the DOP is 2. So this statement needs a total of 4 PX servers. One PX server set will perform Line Id 5-8, the other set will perform Line id 2-4.

DFO Tree, Parallelizer

Any PX COORDINATOR in an execution plan is called a parallelizer. As we see in the above example there may be multiple DFOs under a parallelizer, these DFOs are grouped under a DFO tree, so the terms parallelizer and DFO tree are used interchangably. In the above examples we see that there is one parallelizer meaning one DFO tree.

A DFO tree in the plan is carried out by at most 2 PX server sets. If there is only one DFO under the DFO tree as the first example shows there will be only one PX server set. If there are two or more DFOs under the DFO tree 2 PX server sets will be used, we limit the number of PX server sets to 2 for a DFO tree. This is why most statements use 2 PX server sets meaning they will use DOP*2 number of PX servers. Let's look at a plan that has 3 DFOs.

SQL> explain plan for 
  2  select /*+ parallel(2) */ count(*)
  3  from sales s, customers c
  4  where s.cust_id=c.cust_id;

Explained.

SQL> select * from table(dbms_xplan.display);

Even though we have 3 DFOs there is only one DFO tree (one PX COORDINATOR), this means this statement will need 2 PX server sets, 4 PX servers as the DOP is 2.

Finding the Number of Parallelizers and PX Server Sets

You can find the number of PX server sets for a statement using the view v$pq_sesstat. If we run the last example statement above here is what we see.

SQL> select /*+ parallel(2) */ count(*)
  2  from sales s, customers c
  3  where s.cust_id=c.cust_id;

  COUNT(*)
----------
   9584391

SQL> select * from v$pq_sesstat;

STATISTIC                      LAST_QUERY SESSION_TOTAL     CON_ID
------------------------------ ---------- ------------- ----------
Queries Parallelized                    1             1          0
DML Parallelized                        0             0          0
DDL Parallelized                        0             0          0
DFO Trees                               1             1          0
Server Threads                          4             0          0
Allocation Height                       2             0          0
Allocation Width                        1             0          0
Local Msgs Sent                    387913        387913          0
Distr Msgs Sent                         0             0          0
Local Msgs Recv'd                  387913        387913          0
Distr Msgs Recv'd                       0             0          0
DOP                                     2             0          0
Slave Sets                              2             0          0

13 rows selected.

It shows what we had 1 parallelizer (indicated by the statistic DFO Trees), 2 PX server sets (indicated by the statistic Slave Sets), 4 PX servers (indicated by the statistic Server Threads, and the DOP was 2.

SQL Monitor shows the DFO trees and PX server sets in the Parallel tab of the SQL Monitor report as can be seen in the below screenshot.

It shows the PX server sets as "Parallel Set" and PX servers as "Parallel Server". It seems like we are doing everything to confuse the users by using different names everywhere. In this SQL Monitor report we see that we had 2 PX server sets and 4 PX servers.

Multiple Parallelizers

Some statements can have more than one parallelizer (DFO tree). Since each parallelizer can use 2 PX server sets these statements can use more than DOP*2 number of PX servers. You can identify such statements by looking at the plan as explained above. If the plan has multiple PX coordinators it means the statement has multiple parallelizers. The following plan has 2 parallelizers as indicated by the number of PX coordinators.

SQL> explain plan for
  2  select /*+ parallel(2) */ cust_id, (select max(cust_id) from customers)
  3  from customers
  4  order by cust_id;

Explained.

SQL> select * from table(dbms_xplan.display);

Here is what v$pq_sesstat shows after running this statement.

STATISTIC                      LAST_QUERY SESSION_TOTAL     CON_ID
------------------------------ ---------- ------------- ----------
Queries Parallelized                    1             7          0
DML Parallelized                        0             0          0
DDL Parallelized                        0             0          0
DFO Trees                               2            10          0
Server Threads                          6             0          0
Allocation Height                       2             0          0
Allocation Width                        1             0          0
Local Msgs Sent                     11878       1190228          0
Distr Msgs Sent                         0             0          0
Local Msgs Recv'd                   11873       1190215          0
Distr Msgs Recv'd                       0             0          0
DOP                                     2             0          0
Slave Sets                              3             0          0

We had 2 parallelizers (DFO Trees), 3 PX server sets (Slave Sets) and 6 PX servers (Server Threads). There are cases where v$pq_sesstat shows incorrect information when the plan has multiple parallelizers, so do not rely on this information if you have plans with multiple parallelizers, I will talk about those cases in the next post. 

SQL Monitor shows DFO trees as Parallel Group in the Parallel tab as can be seen in the below screenshot.

Again we see that we had 2 parallelizers and 3 PX server sets.

After covering the basic terminology we can now talk about multiple parallelizers in more detail and look at how they work. Stay tuned for the next post. UPDATE 9 Dec 2015: That post is now published. 

Friday Nov 20, 2015

Review of Data Warehousing and Big Data at #oow15

DW BigData Review Of OOW15

This year OpenWorld was bigger, more exciting and packed with sessions about the very latest technology and product features. Most importantly, both data warehousing and Big Data were at the heart of this year’s conference across a number of keynotes and a huge number of general sessions. Our hands-on labs were all completely full as people got valuable hands-on time with our most important new features. The key focus areas at this year’s conference were:
  • Database 12c for Data Warehousing
  • Big Data and the Internet of Things 
  • Cloud from on-premise to on the Cloud to running hybrid Cloud systems
  • Analytics and SQL continues to evolve to enable more and more sophisticated analysis. 
All these topics appeared across the main keynote sessions including live on-stage demonstrations of how many of our news features can be used to increase the performance and analytical capability of your data warehouse and big data management system - checkout the on-demand videos for the keynotes and executive interviews....
[Read More]
About

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.

Search

Archives
« February 2016
SunMonTueWedThuFriSat
 
1
2
3
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
     
       
Today