SET NUMWIDTH 12;
COL CLASS_NAME FORMAT A25;
COL STAT_NAME FORMAT A40;
SET LINES 300;
SET PAGES 1000;
SET SQLN OFF;
SET SQLPROMPT "";
SET ESCAPE ON;
SET MARKUP HTML ON SPOOL ON PREFORMAT ON ENTMAP OFF HEAD "
EXAMPLE PIPELINED SQL FUNCTION";
SPOOL C:\\temp\\pipelined_sql_example.html
PROMPT
Example of How To Use a Pipelined SQL Function
;
SET ECHO ON;
ALTER SESSION ENABLE PARALLEL DML;
DROP TABLE metrics;
DROP SEQUENCE metrics_seq;
DROP TYPE tbl_metrics_type;
DROP TYPE rec_metrics_type;
DROP DIMENSION metrics_dim;
CREATE SEQUENCE metrics_seq START WITH 1 INCREMENT BY 1;
CREATE TABLE metrics
(metric_id NUMBER NOT NULL CONSTRAINT metrics_pk PRIMARY KEY NOVALIDATE,
class_id NUMBER NOT NULL,
class_name VARCHAR2(120) NOT NULL,
stat_id NUMBER NOT NULL,
stat_name VARCHAR2(120) NOT NULL);
ALTER TABLE metrics ADD CONSTRAINT metrics_uk UNIQUE (class_id,class_name,stat_id,stat_name) VALIDATE;
CREATE OR REPLACE TYPE rec_metrics_type IS OBJECT (
metric_id NUMBER,
class_id NUMBER,
class_name VARCHAR2 (120),
stat_id NUMBER,
stat_name VARCHAR2 (120)
);
/
CREATE OR REPLACE TYPE tbl_metrics_type IS TABLE OF rec_metrics_type;
/
CREATE DIMENSION metrics_dim
LEVEL class_level IS (metrics.class_id)
LEVEL stat_level IS (metrics.stat_id)
HIERARCHY metric_hier (stat_level CHILD OF class_level)
ATTRIBUTE class_level DETERMINES (metrics.class_name)
ATTRIBUTE stat_level DETERMINES (metrics.stat_name);
CREATE OR REPLACE VIEW metrics_vw
AS
SELECT /*+ PARALLEL (wss, 2) */ s.CLASS class_id,
DECODE (s.CLASS,
1, 'User',
2, 'Redo',
4, 'Enqueue',
8, 'Cache',
16, 'OS',
32, 'Parallelism',
40, 'Real Application Clusters',
64, 'SQL',
72, 'Internal Debugging',
128, 'Debug'
) class_name,
wss.stat_id, s.NAME
FROM SYS.wrh$_service_stat wss INNER JOIN v$sysstat s
ON (wss.stat_id = s.stat_id)
ORDER BY s.CLASS;
CREATE OR REPLACE PACKAGE load_dim_pkg
AS
TYPE metrics_cur IS REF CURSOR RETURN metrics_vw%ROWTYPE;
FUNCTION get_data (p_cur IN load_dim_pkg.metrics_cur) RETURN tbl_metrics_type
PARALLEL_ENABLE (PARTITION p_cur BY ANY)
PIPELINED;
END;
/
SHOW ERRORS;
CREATE OR REPLACE PACKAGE BODY load_dim_pkg
AS
FUNCTION get_data (p_cur IN load_dim_pkg.metrics_cur) RETURN tbl_metrics_type
PARALLEL_ENABLE (PARTITION p_cur BY ANY)
PIPELINED IS
rec_in rec_metrics_type := rec_metrics_type (NULL, NULL, NULL, NULL, NULL);
rec_out rec_metrics_type := rec_metrics_type (NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p_cur INTO rec_in.class_id, rec_in.class_name, rec_in.stat_id,rec_in.stat_name;
EXIT WHEN p_cur%NOTFOUND;
--Transformations may be performed here
rec_out.metric_id := NULL;
rec_out.class_id := rec_in.class_id;
rec_out.class_name := INITCAP (rec_in.class_name);
rec_out.stat_id := rec_in.stat_id;
rec_out.stat_name := INITCAP (rec_in.stat_name);
PIPE ROW (rec_out);
END LOOP;
RETURN;
END;
END;
/
SHOW ERRORS;
SET autotrace ON;
MERGE INTO metrics m USING
(SELECT *
FROM TABLE (load_dim_pkg.get_data (CURSOR (SELECT DISTINCT *
FROM metrics_vw)))) s1
ON (m.stat_id = s1.stat_id)
WHEN MATCHED THEN
UPDATE SET m.stat_name = s1.stat_name, m.class_id = s1.class_id, m.class_name= s1.class_name
WHEN NOT MATCHED THEN
INSERT VALUES (metrics_seq.nextval, s1.class_id , s1.class_name , s1.stat_id, s1.stat_name);
SET autotrace OFF;
COMMIT;
SELECT * FROM V$PQ_SESSTAT WHERE STATISTIC IN ('Queries Parallelized','Server Threads');
SELECT * FROM METRICS ORDER BY CLASS_NAME;
SPOOL OFF;