Subscribe

Share

Application Development

Pipelined Table Functions

Pass data back to the calling query before the function is completed.

By Steven Feuerstein

November/December 2018

In my two previous articlesWhen Is a Function Like a Table? When It’s a Table Function!” and “Streaming Table Functions,” I introduced table functions—functions that can be called in the FROM clause of a SELECT statement as if they were tables—and then took a closer look at streaming table functions. In this article, I explore a very special kind of table function: the pipelined table function.

Pipelined table functions are table functions that return or “pipe” rows back to the calling query as the function produces the data in the desired form—and before the function has completed all of its processing.

Before we dive into the implementation and application of pipelined table functions, it is important to understand how unusual the above statement is. PL/SQL is not a multithreaded language. Generally, when a PL/SQL block (anonymous, nested, procedure function, and so on) is invoked, further processing in that session is “on hold” (suspended) until the block returns control to the host that invoked the block—whether that host is another PL/SQL block, a SQL statement, or a host language such as Java.

Normal (nonpipelined) table functions act in precisely this way. Each time the table function is invoked in the FROM clause, the SQL engine must wait until a RETURN statement is executed to pass back the collection to the SELECT statement for conversion into rows and columns.

This blocking behavior can have a negative impact on overall performance of the SELECT statement, especially in the ETL (extract, transform, and load) operations of a data warehouse. In addition, with each element added to the collection in the table function, more and more Program Global Area (PGA) memory is consumed. For very large datasets, this can lead to further performance degradation and even errors.

Pipelined table functions get around both of these problems. Let’s take a look.

A Very Simple Example

Let’s start our exploration of pipelined table functions (which I also refer to as PTFs in this article) by comparing a very simple regular (nonpipelined) table function with a very simple PTF.

First, I create a schema-level nested table type of strings.

CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2 (100);
/

Next, I compile a table function that returns a nested table of that type with a single string inside it:

CREATE OR REPLACE FUNCTION strings
   RETURN strings_t
   AUTHID DEFINER
IS
   l_strings strings_t := strings_t ('abc');
BEGIN
   RETURN l_strings;
END;
/

When I call the table function in a SELECT statement, it returns abc:

SELECT COLUMN_VALUE my_string FROM TABLE (strings ())
/
MY_STRING
—————————
abc

Now I will create a pipelined version of that same table function.

CREATE OR REPLACE FUNCTION strings_pl
   RETURN strings_t
   PIPELINED
   AUTHID DEFINER
IS
BEGIN
   PIPE ROW ('abc');
   RETURN;
END;
/

And when I run it, I see the same results:

SELECT COLUMN_VALUE my_string FROM TABLE (strings_pl ())
/
MY_STRING
—————————
abc

Now let’s explore the differences between the code in these regular and pipelined table functions:

Regular table functions Pipelined table functions
No keyword in header PIPELINED appears in header
Declares local collection No declaration of a local collection
Populates collection with string Passes string to PIPE ROW
Returns the collection Returns nothing but control

And there’s one other big difference between regular and pipelined table functions: regular table functions can be called in PL/SQL, but pipelined table functions can be called only from within a SELECT statement, as you can see in the error message below:

DECLARE
   l_strings   strings_t := strings_t ();
BEGIN
   l_strings := strings_pl ();
END;
/

PLS-00653: aggregate/table functions are not allowed in PL/SQL scope

This should not be entirely unexpected. Because PL/SQL is not a multi­threaded language, it cannot accept rows that are “piped” back before the function terminates execution.

Of course, in this incredibly basic pipelined table function, there will be no issue of blocking or memory consumption. But it gets across the basic differences in the process flow of a pipelined table function:

  1. Add the PIPELINED keyword to the function header.
  2. Use PIPE ROW to send the data back to the calling SELECT statement instead of adding the data to a local collection.
  3. Return nothing but control.

A Nontrivial Pipelined Table Function

Before taking a look at how pipelined table functions can help improve performance over nonpipelined table functions and also reduce PGA memory consumption, let’s build a more interesting PTF.

My transformation in this example is fairly trivial—and could even be handled with a table function, with “pure SQL.” The data transformation requirement is to split each row in the STOCKS table into two rows in the TICKERS table.

I create the two tables:

CREATE TABLE stocks
(
   ticker        VARCHAR2 (20),
   trade_date    DATE,
   open_price    NUMBER,
   close_price   NUMBER
)
/

CREATE TABLE tickers
(
   ticker      VARCHAR2 (20),
   pricedate   DATE,
   pricetype   VARCHAR2 (1),
   price       NUMBER
)
/

I then create the object type and nested table type needed for the table functions:

CREATE TYPE ticker_ot AS OBJECT
(
   ticker VARCHAR2 (20),
   pricedate DATE,
   pricetype VARCHAR2 (1),
   price NUMBER
);
/

CREATE TYPE tickers_nt AS TABLE OF ticker_ot;
/

As discussed in the streaming table function article, I will need to pass a dataset to the table functions, and for that I need a REF CURSOR type:

CREATE OR REPLACE PACKAGE stock_mgr
   AUTHID DEFINER
IS
   TYPE stocks_rc IS REF CURSOR
      RETURN stocks%ROWTYPE;
END stock_mgr;
/

I am now ready to create my table functions. The nonpipelined version is shown in Listing 1. It follows the typical streaming table function pattern:

  • Fetch rows from the incoming dataset.
  • Perform the transformation.
  • Put the transformed rows into the nested table.
  • Return the nested table.

Now let’s see how this pattern unfolds in my doubled_nopl function—one STOCKS row doubled to two TICKER rows—and the line descriptions that follow.

Listing 1: Nonpipelined table function

CREATE OR REPLACE FUNCTION doubled_nopl (rows_in stock_mgr.stocks_rc)
   RETURN tickers_nt
   AUTHID DEFINER
IS
   TYPE stocks_aat IS TABLE OF stocks%ROWTYPE
      INDEX BY PLS_INTEGER;

   l_stocks    stocks_aat;

   l_doubled   tickers_nt := tickers_nt ();
BEGIN
   LOOP
      FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100;

      EXIT WHEN l_stocks.COUNT = 0;

      FOR l_row IN 1 .. l_stocks.COUNT
      LOOP
         l_doubled.EXTEND;
         l_doubled (l_doubled.LAST) :=
            ticker_ot (l_stocks (l_row).ticker,
                       l_stocks (l_row).trade_date,
                       'O',
                       l_stocks (l_row).open_price);

         l_doubled.EXTEND;
         l_doubled (l_doubled.LAST) :=
            ticker_ot (l_stocks (l_row).ticker,
                       l_stocks (l_row).trade_date,
                       'C',
                       l_stocks (l_row).close_price);
      END LOOP;
   END LOOP;

   RETURN l_doubled;
END;

 

Line(s) Description
1 Use the REF CURSOR type defined in the package for the rows passed in. Because I am selecting from the STOCKS table, I use the stocks_rc type.
2 Return an array, each of whose elements looks just like a row in the TICKERS table.
5–6 Declare an associative array to hold rows fetched from the rows_in cursor variable.
8 Declare the local variable to be filled and then returned back to the SELECT statement.
12 Start up a simple loop to fetch rows from the cursor variable. It’s already open—the CURSOR expression takes care of that.
13–15 Use the BULK COLLECT feature to retrieve as many as 100 rows with each fetch. I do this to avoid row-by-row processing, which is not efficient enough. Exit the loop when the associative array is empty.
17 For each element in the array (row from the cursor variable)...
19–24 Use EXTEND to add another element at the end of the nested table, then call the object type constructor to create the first (“opening”) of the two rows for the TICKERS table, and put the element into the new last index value of the collection.
26–31 Do the same for the second (“closing”) row of the TICKERS table.
35 Send the nested table back to the SELECT statement for streaming.
Now let’s create the pipelined version of the above function (see Listing 2) and call it doubled_pl. Take a look at the line descriptions that follow.

Listing 2: Pipelined table function

CREATE OR REPLACE FUNCTION doubled_pl (rows_in stock_mgr.stocks_rc)
   RETURN tickers_nt
   PIPELINED
   AUTHID DEFINER
IS
   TYPE stocks_aat IS TABLE OF stocks%ROWTYPE
      INDEX BY PLS_INTEGER;

   l_stocks   stocks_aat;
BEGIN
   LOOP
      FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100;

      EXIT WHEN l_stocks.COUNT = 0;

      FOR l_row IN 1 .. l_stocks.COUNT
      LOOP
         PIPE ROW (ticker_ot (l_stocks (l_row).ticker,
                              l_stocks (l_row).trade_date,
                              'O',
                              l_stocks (l_row).open_price));
         PIPE ROW (ticker_ot (l_stocks (l_row).ticker,
                              l_stocks (l_row).trade_date,
                              'C',
                              l_stocks (l_row).close_price));
      END LOOP;
   END LOOP;

   RETURN;
END;

 

Line(s) Description
1 Because this is a streaming table function, it should come as no surprise that the parameter to the function is a cursor variable passing in rows of data from the invoking SELECT statement.
3 Specify that this is a pipelined table function.
6–7 Declare the associative array used to retrieve rows fetched from the cursor variable.
12–14 Fetch the next 100 rows; stop when the collection is empty.
16 For each row of data moved to the collection…
18–21 Use the object type constructor function to “transfer” opening-price data from the element in the collection to the attributes of the object type instance. Then immediately send that data back to the calling query with PIPE ROW.
22–25 Do the same thing for the closing-price data.
29 Return control back to the query.
Note that you do not need to explicitly close the cursor variable; Oracle Database will automatically close a cursor variable created with the CURSOR expression when the table function terminates.

Before exploring the impact on performance and memory, let’s verify that this pipelined table function can be used in a SELECT statement just like the nonpipelined version.

SELECT COUNT (*) FROM stocks
/

100000

INSERT INTO tickers
   SELECT * FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks)))
/

SELECT COUNT (*)
  FROM tickers
/

200000

OK, that looks good.

Impact of Switch to Pipelined Table Functions

Let’s compare the behavior of streaming table functions, nonpipelined and pipelined. As discussed in my previous article on table functions, a streaming table function streams data directly from one process or transformation to the next without intermediate staging. It is a common use case in data warehouses.

Now let’s prove that the SQL engine is able to take those piped rows and put them immediately to work—and use less PGA memory along the way.

Proofs require hard facts, so I will construct a small utilities package to measure elapsed CPU time and memory consumption.

Note: For this package to compile in your schema, you will need the SELECT privilege on v$mystat and v$statname.

First, I create my API to the utility’s functionality:

CREATE OR REPLACE PACKAGE utils
IS
   PROCEDURE initialize (context_in IN VARCHAR2);

   PROCEDURE show_results (message_in IN VARCHAR2 := NULL);
END;
/

Then I compile the utility’s package body. Listing 3 includes the code, and a description of all the interesting bits follows.

Listing 3: utils package body code

CREATE OR REPLACE PACKAGE BODY utils
IS
   last_timing   INTEGER := NULL;
   last_pga      INTEGER := NULL;

   FUNCTION pga_consumed
      RETURN NUMBER
   AS
      l_pga   NUMBER;
   BEGIN
      SELECT st.VALUE
        INTO l_pga
        FROM v$mystat st, v$statname sn
       WHERE st.statistic# = sn.statistic# AND sn.name = 'session pga memory';

      RETURN l_pga;
   END;

   PROCEDURE initialize (context_in IN VARCHAR2)
   IS
   BEGIN
       DELETE FROM tickers;

      COMMIT;
      DBMS_OUTPUT.put_line (context_in);
      last_timing := DBMS_UTILITY.get_cpu_time;
      last_pga := pga_consumed;
   END;

   PROCEDURE show_results (message_in IN VARCHAR2 := NULL)
   IS
      l_count   INTEGER;
   BEGIN
      SELECT COUNT (*) INTO l_count FROM tickers;

      DBMS_OUTPUT.put_line ('Ticker row count: ' || l_count);
      DBMS_OUTPUT.put_line (
            '"'
         || message_in
         || '" completed in: '
         || TO_CHAR (DBMS_UTILITY.get_cpu_time - last_timing)
         || ' centisecs; pga at: '
         || TO_CHAR (pga_consumed () - last_pga)
         || ' bytes');
   END;
END;
/

 

Line(s) Description
11–14 Query the v$ views to retrieve the current value for PGA memory consumption in my session.
19–28 Get things ready to test. Clear out the TICKERS table; get the start time and the starting level of PGA memory consumption.
30–45 Post the execution of the code. Get the number of rows in the TICKERS table, calculate and display the number of elapsed centiseconds (hundredths of a second), and calcu-late and display the change in PGA memory.
Now I am ready to run my test code as follows:

 

  • Initialize starting points in my session.
  • Call the pipelined version of the doubled function (doubled_pl) to insert rows into the TICKERS table. But add a ROWNUM clause to say, “I want only the first 9 rows.”
  • Do the same for the nonpipelined version of the function (doubled_nopl).

 

BEGIN
   utils.initialize ('Pipelined');

   INSERT INTO tickers
      SELECT *
        FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks)))
       WHERE ROWNUM < 10;

utils.show_results ('First 9 rows');

   utils.initialize ('Not Pipelined');

   INSERT INTO tickers
      SELECT *
        FROM TABLE (doubled_nopl (CURSOR (SELECT * FROM stocks)))
       WHERE ROWNUM < 10;

   utils.show_results ('First 9 rows');
END;
/

Here are the results I got for an initial load of 200,000 rows into the STOCKS table.

Pipelined
"First 9 rows" completed in: 8 centisecs; pga at: 528345 bytes
Ticker row count: 9

Not Pipelined
"First 9 rows" completed in: 93 centisecs; pga at: 96206848 bytes
Ticker row count: 9

The significantly shorter response time with the pipelined function demonstrates clearly that the INSERT-SELECT statement was able to keep track of the rows returned by the function. As soon as nine rows were passed back, the SQL engine terminated execution of the pipelined table function and inserted the rows.

No Clause Needed

As of Oracle Database 12c Release 2, you no longer need to include the TABLE clause with table functions. This query, calling the strings table function, will work just as well:

SELECT COLUMN_VALUE my_string 
FROM strings ()

With the nonpipelined version, I have to wait for 10,000 rows to be doubled into 20,000 rows (consuming lots of PGA memory as well). Then all those rows are passed back to the SELECT statement, at which point the SQL engine says, “Well, I wanted just the first nine” and throws away the rest.

Very inefficient.

From a memory standpoint, the nonpipelined table function consumes much more PGA memory than the pipelined version. That should make perfect sense to you, because I do not declare and fill up a collection to be returned by the function.

The NO_DATA_NEEDED Exception

Sometimes—as in the performance and memory test in the previous section—you will want to terminate the pipelined table function before all rows have been piped back. Oracle Database will then raise the NO_DATA_NEEDED exception. This will terminate the function but will not terminate the SELECT statement that called it.

So for the most part, you don’t have to worry about this exception. You do need to explicitly handle this exception if either of the following applies:

  • You include an OTHERS exception handler in a block that includes a PIPE ROW statement.
  • Your code that feeds a PIPE ROW statement must be followed by a cleanup procedure. Typically, the cleanup procedure releases resources the code no longer needs.

Let’s explore this behavior in more detail. In this first example, I use only one row, so Oracle Database raises NO_DATA_NEEDED but no exception is raised.

CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2 (100);
/

CREATE OR REPLACE FUNCTION strings
   RETURN strings_t
   PIPELINED
   AUTHID DEFINER
IS
BEGIN
   PIPE ROW (1);
   PIPE ROW (2);
   RETURN;
END;
/

SELECT COLUMN_VALUE my_string
  FROM TABLE (strings ())
 WHERE ROWNUM < 2
/

MY_STRING
—————————
1

Now I add an OTHERS exception handler and nothing else.

CREATE OR REPLACE FUNCTION strings
   RETURN strings_t
   PIPELINED
   AUTHID DEFINER
IS
BEGIN
   PIPE ROW (1);
   PIPE ROW (2);
   RETURN;
EXCEPTION
   WHEN OTHERS
   THEN
      DBMS_OUTPUT.put_line ('Error: ' || SQLERRM);
      RAISE;
END;
/

SELECT COLUMN_VALUE my_string
  FROM TABLE (strings ())
 WHERE ROWNUM < 2
/

MY_STRING
—————————
1

Error: ORA-06548: no more rows needed

As you can see, the NO_DATA_NEEDED error is trapped by that handler and the reraise does not manifest as an error in the SELECT statement. The problem with taking this approach, though, is that your OTHERS handler might contain specific cleanup code that makes sense for unexpected failures but not for early termination of data piping. I therefore recommend that you include a specific handler for NO_DATA_NEEDED.

In the code below, I demonstrate that both NO_DATA_FOUND and NO_DATA_NEEDED are by default “hidden” from the calling query but other exceptions such as PROGRAM_ERROR result in termination of the SQL statement.

CREATE OR REPLACE FUNCTION strings (err_in IN VARCHAR2)
   RETURN strings_t
   PIPELINED
   AUTHID DEFINER
IS
BEGIN
   PIPE ROW (1);

   CASE err_in
      WHEN 'no_data_found'
      THEN
         RAISE NO_DATA_FOUND;
      WHEN 'no_data_needed'
      THEN
         RAISE no_data_needed;
      WHEN 'program_error'
      THEN
         RAISE PROGRAM_ERROR;
   END CASE;

   RETURN;
END;
/

SELECT COLUMN_VALUE my_string FROM TABLE (strings ('no_data_found'))
/

MY_STRING
—————————
1

SELECT COLUMN_VALUE my_string FROM TABLE (strings ('no_data_needed'))
/

MY_STRING
—————————
1

SELECT COLUMN_VALUE my_string FROM TABLE (strings ('program_error'))
/

MY_STRING
—————————
1
ORA-06501: PL/SQL: program error

The basic takeaway regarding NO_DATA_NEEDED is not to worry about it unless you are providing a WHEN OTHERS handler in your pipelined table function. In such a case, make sure to provide a handler for NO_DATA_NEEDED, simply reraising the exception with a RAISE statement.

Package-Based Types Implicitly Declared

With pipelined table functions only, your types can be defined in the specification of a package, as opposed to in the schema with independent CREATE OR REPLACE statements. You can even declare your nested table as a collection of record types.

CREATE TABLE stocks2
(
   ticker        VARCHAR2 (20),
   trade_date    DATE,
   open_price    NUMBER,
   close_price   NUMBER
)
/

CREATE OR REPLACE PACKAGE pkg
   AUTHID DEFINER
AS
   TYPE stocks_nt IS TABLE OF stocks2%ROWTYPE;

   FUNCTION stock_rows
      RETURN stocks_nt
      PIPELINED;
END;
/

CREATE OR REPLACE PACKAGE BODY pkg
AS
   FUNCTION stock_rows
      RETURN stocks_nt
      PIPELINED
   IS
      l_stock   stocks2%ROWTYPE;
   BEGIN
      l_stock.ticker := 'ORCL';
      l_stock.open_price := 100;
      PIPE ROW (l_stock);
      RETURN;
   END;
END;
/

SELECT ticker, open_price FROM TABLE (pkg.stock_rows ())
/

This is more convenient, but behind the scenes, Oracle Database is creating types implicitly for you, as you can see below.

SELECT object_name, object_type
  FROM user_objects
 WHERE object_type IN ('TYPE', 'PACKAGE', 'PACKAGE BODY') 
/

OBJECT_NAME                 OBJECT_TYPE
——————————————————————————  ———————————
SYS_PLSQL_B85654CF_DUMMY_1  TYPE
SYS_PLSQL_B85654CF_9_1      TYPE
SYS_PLSQL_4DF80292_DUMMY_1  TYPE
SYS_PLSQL_4DF80292_30_1     TYPE
PKG                         PACKAGE
PKG                         PACKAGE BODY

Summary

Pipelined table functions are something of an oddity in PL/SQL. They pass data back to the calling query even before the function is completed, and they don’t pass back anything but control with the RETURN statement. You cannot call a PTF from within PL/SQL itself, only in the FROM clause of a SELECT statement.

But those oddities reflect the power of this special type of function: improved performance and reduced memory consumption compared to normal (nonpipelined) table functions.

Next Steps

READ documentation about creating pipelined table functions.

READ more about pipelined table functions (ORACLE-BASE).

LEARN more about PL/SQL table functions (a Dev Gym class).

Illustration by Wes Rowell