In my two previous articles “When Is a Function Like a Table? When It’s a Table Function!” and “Streaming Table Functions,” I introduced table functions—functions that can be called in the FROM clause of a SELECT statement as if they were tables—and then took a closer look at streaming table functions. In this article, I explore a very special kind of table function: the pipelined table function.
Pipelined table functions are table functions that return or “pipe” rows back to the calling query as the function produces the data in the desired form—and before the function has completed all of its processing.
Before we dive into the implementation and application of pipelined table functions, it is important to understand how unusual the above statement is. PL/SQL is not a multithreaded language. Generally, when a PL/SQL block (anonymous, nested, procedure function, and so on) is invoked, further processing in that session is “on hold” (suspended) until the block returns control to the host that invoked the block—whether that host is another PL/SQL block, a SQL statement, or a host language such as Java.
Normal (nonpipelined) table functions act in precisely this way. Each time the table function is invoked in the FROM clause, the SQL engine must wait until a RETURN statement is executed to pass back the collection to the SELECT statement for conversion into rows and columns.
This blocking behavior can have a negative impact on overall performance of the SELECT statement, especially in the ETL (extract, transform, and load) operations of a data warehouse. In addition, with each element added to the collection in the table function, more and more Program Global Area (PGA) memory is consumed. For very large datasets, this can lead to further performance degradation and even errors.
Pipelined table functions get around both of these problems. Let’s take a look.
A Very Simple Example
Let’s start our exploration of pipelined table functions (which I also refer to as PTFs in this article) by comparing a very simple regular (nonpipelined) table function with a very simple PTF.
First, I create a schema-level nested table type of strings.
CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2 (100); /
Next, I compile a table function that returns a nested table of that type with a single string inside it:
CREATE OR REPLACE FUNCTION strings RETURN strings_t AUTHID DEFINER IS l_strings strings_t := strings_t ('abc'); BEGIN RETURN l_strings; END; /
When I call the table function in a SELECT statement, it returns abc:
SELECT COLUMN_VALUE my_string FROM TABLE (strings ()) / MY_STRING ————————— abc
Now I will create a pipelined version of that same table function.
CREATE OR REPLACE FUNCTION strings_pl RETURN strings_t PIPELINED AUTHID DEFINER IS BEGIN PIPE ROW ('abc'); RETURN; END; /
And when I run it, I see the same results:
SELECT COLUMN_VALUE my_string FROM TABLE (strings_pl ()) / MY_STRING ————————— abc
Now let’s explore the differences between the code in these regular and pipelined table functions:
|Regular table functions||Pipelined table functions|
|No keyword in header||PIPELINED appears in header|
|Declares local collection||No declaration of a local collection|
|Populates collection with string||Passes string to PIPE ROW|
|Returns the collection||Returns nothing but control|
And there’s one other big difference between regular and pipelined table functions: regular table functions can be called in PL/SQL, but pipelined table functions can be called only from within a SELECT statement, as you can see in the error message below:
DECLARE l_strings strings_t := strings_t (); BEGIN l_strings := strings_pl (); END; / PLS-00653: aggregate/table functions are not allowed in PL/SQL scope
This should not be entirely unexpected. Because PL/SQL is not a multithreaded language, it cannot accept rows that are “piped” back before the function terminates execution.
Of course, in this incredibly basic pipelined table function, there will be no issue of blocking or memory consumption. But it gets across the basic differences in the process flow of a pipelined table function:
A Nontrivial Pipelined Table Function
Before taking a look at how pipelined table functions can help improve performance over nonpipelined table functions and also reduce PGA memory consumption, let’s build a more interesting PTF.
My transformation in this example is fairly trivial—and could even be handled with a table function, with “pure SQL.” The data transformation requirement is to split each row in the STOCKS table into two rows in the TICKERS table.
I create the two tables:
CREATE TABLE stocks ( ticker VARCHAR2 (20), trade_date DATE, open_price NUMBER, close_price NUMBER ) / CREATE TABLE tickers ( ticker VARCHAR2 (20), pricedate DATE, pricetype VARCHAR2 (1), price NUMBER ) /
I then create the object type and nested table type needed for the table functions:
CREATE TYPE ticker_ot AS OBJECT ( ticker VARCHAR2 (20), pricedate DATE, pricetype VARCHAR2 (1), price NUMBER ); / CREATE TYPE tickers_nt AS TABLE OF ticker_ot; /
As discussed in the streaming table function article, I will need to pass a dataset to the table functions, and for that I need a REF CURSOR type:
CREATE OR REPLACE PACKAGE stock_mgr AUTHID DEFINER IS TYPE stocks_rc IS REF CURSOR RETURN stocks%ROWTYPE; END stock_mgr; /
I am now ready to create my table functions. The nonpipelined version is shown in Listing 1. It follows the typical streaming table function pattern:
Now let’s see how this pattern unfolds in my doubled_nopl function—one STOCKS row doubled to two TICKER rows—and the line descriptions that follow.
Listing 1: Nonpipelined table function
CREATE OR REPLACE FUNCTION doubled_nopl (rows_in stock_mgr.stocks_rc) RETURN tickers_nt AUTHID DEFINER IS TYPE stocks_aat IS TABLE OF stocks%ROWTYPE INDEX BY PLS_INTEGER; l_stocks stocks_aat; l_doubled tickers_nt := tickers_nt (); BEGIN LOOP FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100; EXIT WHEN l_stocks.COUNT = 0; FOR l_row IN 1 .. l_stocks.COUNT LOOP l_doubled.EXTEND; l_doubled (l_doubled.LAST) := ticker_ot (l_stocks (l_row).ticker, l_stocks (l_row).trade_date, 'O', l_stocks (l_row).open_price); l_doubled.EXTEND; l_doubled (l_doubled.LAST) := ticker_ot (l_stocks (l_row).ticker, l_stocks (l_row).trade_date, 'C', l_stocks (l_row).close_price); END LOOP; END LOOP; RETURN l_doubled; END;
|1||Use the REF CURSOR type defined in the package for the rows passed in. Because I am selecting from the STOCKS table, I use the stocks_rc type.|
|2||Return an array, each of whose elements looks just like a row in the TICKERS table.|
|5–6||Declare an associative array to hold rows fetched from the rows_in cursor variable.|
|8||Declare the local variable to be filled and then returned back to the SELECT statement.|
|12||Start up a simple loop to fetch rows from the cursor variable. It’s already open—the CURSOR expression takes care of that.|
|13–15||Use the BULK COLLECT feature to retrieve as many as 100 rows with each fetch. I do this to avoid row-by-row processing, which is not efficient enough. Exit the loop when the associative array is empty.|
|17||For each element in the array (row from the cursor variable)...|
|19–24||Use EXTEND to add another element at the end of the nested table, then call the object type constructor to create the first (“opening”) of the two rows for the TICKERS table, and put the element into the new last index value of the collection.|
|26–31||Do the same for the second (“closing”) row of the TICKERS table.|
|35||Send the nested table back to the SELECT statement for streaming.|
Listing 2: Pipelined table function
CREATE OR REPLACE FUNCTION doubled_pl (rows_in stock_mgr.stocks_rc) RETURN tickers_nt PIPELINED AUTHID DEFINER IS TYPE stocks_aat IS TABLE OF stocks%ROWTYPE INDEX BY PLS_INTEGER; l_stocks stocks_aat; BEGIN LOOP FETCH rows_in BULK COLLECT INTO l_stocks LIMIT 100; EXIT WHEN l_stocks.COUNT = 0; FOR l_row IN 1 .. l_stocks.COUNT LOOP PIPE ROW (ticker_ot (l_stocks (l_row).ticker, l_stocks (l_row).trade_date, 'O', l_stocks (l_row).open_price)); PIPE ROW (ticker_ot (l_stocks (l_row).ticker, l_stocks (l_row).trade_date, 'C', l_stocks (l_row).close_price)); END LOOP; END LOOP; RETURN; END;
|1||Because this is a streaming table function, it should come as no surprise that the parameter to the function is a cursor variable passing in rows of data from the invoking SELECT statement.|
|3||Specify that this is a pipelined table function.|
|6–7||Declare the associative array used to retrieve rows fetched from the cursor variable.|
|12–14||Fetch the next 100 rows; stop when the collection is empty.|
|16||For each row of data moved to the collection…|
|18–21||Use the object type constructor function to “transfer” opening-price data from the element in the collection to the attributes of the object type instance. Then immediately send that data back to the calling query with PIPE ROW.|
|22–25||Do the same thing for the closing-price data.|
|29||Return control back to the query.|
Before exploring the impact on performance and memory, let’s verify that this pipelined table function can be used in a SELECT statement just like the nonpipelined version.
SELECT COUNT (*) FROM stocks / 100000 INSERT INTO tickers SELECT * FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks))) / SELECT COUNT (*) FROM tickers / 200000
OK, that looks good.
Impact of Switch to Pipelined Table Functions
Let’s compare the behavior of streaming table functions, nonpipelined and pipelined. As discussed in my previous article on table functions, a streaming table function streams data directly from one process or transformation to the next without intermediate staging. It is a common use case in data warehouses.
Now let’s prove that the SQL engine is able to take those piped rows and put them immediately to work—and use less PGA memory along the way.
Proofs require hard facts, so I will construct a small utilities package to measure elapsed CPU time and memory consumption.
Note: For this package to compile in your schema, you will need the SELECT privilege on v$mystat and v$statname.
First, I create my API to the utility’s functionality:
CREATE OR REPLACE PACKAGE utils IS PROCEDURE initialize (context_in IN VARCHAR2); PROCEDURE show_results (message_in IN VARCHAR2 := NULL); END; /
Then I compile the utility’s package body. Listing 3 includes the code, and a description of all the interesting bits follows.
Listing 3: utils package body code
CREATE OR REPLACE PACKAGE BODY utils IS last_timing INTEGER := NULL; last_pga INTEGER := NULL; FUNCTION pga_consumed RETURN NUMBER AS l_pga NUMBER; BEGIN SELECT st.VALUE INTO l_pga FROM v$mystat st, v$statname sn WHERE st.statistic# = sn.statistic# AND sn.name = 'session pga memory'; RETURN l_pga; END; PROCEDURE initialize (context_in IN VARCHAR2) IS BEGIN DELETE FROM tickers; COMMIT; DBMS_OUTPUT.put_line (context_in); last_timing := DBMS_UTILITY.get_cpu_time; last_pga := pga_consumed; END; PROCEDURE show_results (message_in IN VARCHAR2 := NULL) IS l_count INTEGER; BEGIN SELECT COUNT (*) INTO l_count FROM tickers; DBMS_OUTPUT.put_line ('Ticker row count: ' || l_count); DBMS_OUTPUT.put_line ( '"' || message_in || '" completed in: ' || TO_CHAR (DBMS_UTILITY.get_cpu_time - last_timing) || ' centisecs; pga at: ' || TO_CHAR (pga_consumed () - last_pga) || ' bytes'); END; END; /
|11–14||Query the v$ views to retrieve the current value for PGA memory consumption in my session.|
|19–28||Get things ready to test. Clear out the TICKERS table; get the start time and the starting level of PGA memory consumption.|
|30–45||Post the execution of the code. Get the number of rows in the TICKERS table, calculate and display the number of elapsed centiseconds (hundredths of a second), and calcu-late and display the change in PGA memory.|
BEGIN utils.initialize ('Pipelined'); INSERT INTO tickers SELECT * FROM TABLE (doubled_pl (CURSOR (SELECT * FROM stocks))) WHERE ROWNUM < 10; utils.show_results ('First 9 rows'); utils.initialize ('Not Pipelined'); INSERT INTO tickers SELECT * FROM TABLE (doubled_nopl (CURSOR (SELECT * FROM stocks))) WHERE ROWNUM < 10; utils.show_results ('First 9 rows'); END; /
Here are the results I got for an initial load of 200,000 rows into the STOCKS table.
Pipelined "First 9 rows" completed in: 8 centisecs; pga at: 528345 bytes Ticker row count: 9 Not Pipelined "First 9 rows" completed in: 93 centisecs; pga at: 96206848 bytes Ticker row count: 9
The significantly shorter response time with the pipelined function demonstrates clearly that the INSERT-SELECT statement was able to keep track of the rows returned by the function. As soon as nine rows were passed back, the SQL engine terminated execution of the pipelined table function and inserted the rows.
No Clause Needed
As of Oracle Database 12c Release 2, you no longer need to include the TABLE clause with table functions. This query, calling the strings table function, will work just as well:
SELECT COLUMN_VALUE my_string FROM strings ()
With the nonpipelined version, I have to wait for 10,000 rows to be doubled into 20,000 rows (consuming lots of PGA memory as well). Then all those rows are passed back to the SELECT statement, at which point the SQL engine says, “Well, I wanted just the first nine” and throws away the rest.
From a memory standpoint, the nonpipelined table function consumes much more PGA memory than the pipelined version. That should make perfect sense to you, because I do not declare and fill up a collection to be returned by the function.
The NO_DATA_NEEDED Exception
Sometimes—as in the performance and memory test in the previous section—you will want to terminate the pipelined table function before all rows have been piped back. Oracle Database will then raise the NO_DATA_NEEDED exception. This will terminate the function but will not terminate the SELECT statement that called it.
So for the most part, you don’t have to worry about this exception. You do need to explicitly handle this exception if either of the following applies:
Let’s explore this behavior in more detail. In this first example, I use only one row, so Oracle Database raises NO_DATA_NEEDED but no exception is raised.
CREATE OR REPLACE TYPE strings_t IS TABLE OF VARCHAR2 (100); / CREATE OR REPLACE FUNCTION strings RETURN strings_t PIPELINED AUTHID DEFINER IS BEGIN PIPE ROW (1); PIPE ROW (2); RETURN; END; / SELECT COLUMN_VALUE my_string FROM TABLE (strings ()) WHERE ROWNUM < 2 / MY_STRING ————————— 1
Now I add an OTHERS exception handler and nothing else.
CREATE OR REPLACE FUNCTION strings RETURN strings_t PIPELINED AUTHID DEFINER IS BEGIN PIPE ROW (1); PIPE ROW (2); RETURN; EXCEPTION WHEN OTHERS THEN DBMS_OUTPUT.put_line ('Error: ' || SQLERRM); RAISE; END; / SELECT COLUMN_VALUE my_string FROM TABLE (strings ()) WHERE ROWNUM < 2 / MY_STRING ————————— 1 Error: ORA-06548: no more rows needed
As you can see, the NO_DATA_NEEDED error is trapped by that handler and the reraise does not manifest as an error in the SELECT statement. The problem with taking this approach, though, is that your OTHERS handler might contain specific cleanup code that makes sense for unexpected failures but not for early termination of data piping. I therefore recommend that you include a specific handler for NO_DATA_NEEDED.
In the code below, I demonstrate that both NO_DATA_FOUND and NO_DATA_NEEDED are by default “hidden” from the calling query but other exceptions such as PROGRAM_ERROR result in termination of the SQL statement.
CREATE OR REPLACE FUNCTION strings (err_in IN VARCHAR2) RETURN strings_t PIPELINED AUTHID DEFINER IS BEGIN PIPE ROW (1); CASE err_in WHEN 'no_data_found' THEN RAISE NO_DATA_FOUND; WHEN 'no_data_needed' THEN RAISE no_data_needed; WHEN 'program_error' THEN RAISE PROGRAM_ERROR; END CASE; RETURN; END; / SELECT COLUMN_VALUE my_string FROM TABLE (strings ('no_data_found')) / MY_STRING ————————— 1 SELECT COLUMN_VALUE my_string FROM TABLE (strings ('no_data_needed')) / MY_STRING ————————— 1 SELECT COLUMN_VALUE my_string FROM TABLE (strings ('program_error')) / MY_STRING ————————— 1 ORA-06501: PL/SQL: program error
The basic takeaway regarding NO_DATA_NEEDED is not to worry about it unless you are providing a WHEN OTHERS handler in your pipelined table function. In such a case, make sure to provide a handler for NO_DATA_NEEDED, simply reraising the exception with a RAISE statement.
Package-Based Types Implicitly Declared
With pipelined table functions only, your types can be defined in the specification of a package, as opposed to in the schema with independent CREATE OR REPLACE statements. You can even declare your nested table as a collection of record types.
CREATE TABLE stocks2 ( ticker VARCHAR2 (20), trade_date DATE, open_price NUMBER, close_price NUMBER ) / CREATE OR REPLACE PACKAGE pkg AUTHID DEFINER AS TYPE stocks_nt IS TABLE OF stocks2%ROWTYPE; FUNCTION stock_rows RETURN stocks_nt PIPELINED; END; / CREATE OR REPLACE PACKAGE BODY pkg AS FUNCTION stock_rows RETURN stocks_nt PIPELINED IS l_stock stocks2%ROWTYPE; BEGIN l_stock.ticker := 'ORCL'; l_stock.open_price := 100; PIPE ROW (l_stock); RETURN; END; END; / SELECT ticker, open_price FROM TABLE (pkg.stock_rows ()) /
This is more convenient, but behind the scenes, Oracle Database is creating types implicitly for you, as you can see below.
SELECT object_name, object_type FROM user_objects WHERE object_type IN ('TYPE', 'PACKAGE', 'PACKAGE BODY') / OBJECT_NAME OBJECT_TYPE —————————————————————————— ——————————— SYS_PLSQL_B85654CF_DUMMY_1 TYPE SYS_PLSQL_B85654CF_9_1 TYPE SYS_PLSQL_4DF80292_DUMMY_1 TYPE SYS_PLSQL_4DF80292_30_1 TYPE PKG PACKAGE PKG PACKAGE BODY
Pipelined table functions are something of an oddity in PL/SQL. They pass data back to the calling query even before the function is completed, and they don’t pass back anything but control with the RETURN statement. You cannot call a PTF from within PL/SQL itself, only in the FROM clause of a SELECT statement.
But those oddities reflect the power of this special type of function: improved performance and reduced memory consumption compared to normal (nonpipelined) table functions.
READ documentation about creating pipelined table functions.
READ more about pipelined table functions (ORACLE-BASE).
LEARN more about PL/SQL table functions (a Dev Gym class).
Illustration by Wes Rowell