ETL调度系统
发布时间:2021-01-02 07:35:40 所属栏目:编程 来源:网络整理
导读:今天PHP站长网 52php.cn把收集自互联网的代码分享给大家,仅供参考。 CREATE OR REPLACE PACKAGE PKG_ETL_CTL IS -- 调用主程序,调用存储过程 PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER); -- 创建新的时间周期,运行数据
|
以下代码由PHP站长网 52php.cn收集自互联网 现在PHP站长网小编把它分享给大家,仅供参考 CREATE OR REPLACE PACKAGE PKG_ETL_CTL IS
-- 调用主程序,调用存储过程
PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER);
-- 创建新的时间周期,运行数据周期作业及作业流状态处理
PROCEDURE SP_FLOW_RUN_DEAL;
-- 创建新的时间周期
PROCEDURE SP_FLOW_CREATE_NEW_PERIOD;
-- 运行数据周期内的作业流及作业
PROCEDURE SP_FLOW_RUN_NEW_PERIOD;
-- 同步作业流运行状态
PROCEDURE SP_FLOW_RUN_STATUS;
-- 失败作业流及作业重置为未处理
PROCEDURE SP_FLOW_RUN_ERROR_RESET;
-- 作业状态更新
PROCEDURE SP_JOB_RUN_STATUS
(
I_JOB_ID NUMBER,I_ORG_ID VARCHAR2,I_JOB_RUN_STATUS VARCHAR2,I_JOB_RUN_INFO VARCHAR2
);
-- 作业流日志处理
PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER);
-- 作业日志处理
PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER);
PROCEDURE SP_INSERT_MONITOR_SMS
(
O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/
);
PROCEDURE SP_SEND_MONITOR_SMS;
-- 获取作业流前置依赖
FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2;
-- 获取作业前置依赖
FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2;
-- 获取作业流下属作业运行状态
FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2;
-- 获取下个数据日期
FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE;
-- 获取上级作业流运行状态
FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2;
-- 获取上级作业流数据开始时间
FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE;
-- 获取上级作业流数据结束时间
FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE;
-- 获取周期代码
FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2;
END PKG_ETL_CTL;
/
CREATE OR REPLACE PACKAGE BODY PKG_ETL_CTL IS
/*******************************************************************
程序名 :SP_EXEC_PROC
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 调用主程序,调用存储过程
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_EXEC_PROC(I_JOB_ID NUMBER) IS
VAR_PRO_NAME VARCHAR2(100);
VAR_DATA_START_TIME VARCHAR2(20);
VAR_DATA_END_TIME VARCHAR2(20);
VAR_SQL VARCHAR2(4000);
VAR_PARAMS VARCHAR2(1000);
VAR_ORG_ID VARCHAR2(10);
VAR_JOB_RUN_DESC VARCHAR2(100);
VAR_JOB_ERR_DESC VARCHAR2(100);
BEGIN
-- 获取作业正在运行描述
SELECT T.ETL_PARA_VAL INTO VAR_JOB_RUN_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_RUN_DESC';
-- 获取作业运行失败描述
SELECT T.ETL_PARA_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_ERR_DESC';
-- 获取作业所调用的存储过程,数据开始时间,数据结束时间
SELECT T.ETL_JOB_PROC,TO_CHAR(A.ETL_DATA_START_TIME,'YYYYMMDDHH24MISS'),TO_CHAR(A.ETL_DATA_END_TIME,'YYYYMMDDHH24MISS')
INTO VAR_PRO_NAME,VAR_DATA_START_TIME,VAR_DATA_END_TIME
FROM ETL_CTL_JOB_INFO T,ETL_JOB_RUN_STS A
WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
AND T.ETL_JOB_ID = I_JOB_ID;
-- 获取作业全部参数拼接到一起
FOR LOOP_PARAM IN (SELECT T.ETL_PARA_NAME,DECODE(T.ETL_PARA_TYPE,2,'TO_DATE(' || T.ETL_PARA_VAL || ',''YYYYMMDDHH24MISS'')' -- 日期类型参数转化成日期格式,T.ETL_PARA_VAL) ETL_PARA_VAL,T.ETL_PARA_TYPE
FROM ETL_JOB_PARA T
WHERE T.ETL_JOB_ID = I_JOB_ID) LOOP
-- 获取机构号
IF LOOP_PARAM.ETL_PARA_NAME = 'I_ORG_ID'
THEN
VAR_ORG_ID := LOOP_PARAM.ETL_PARA_VAL;
END IF;
-- 参数拼接
VAR_PARAMS := VAR_PARAMS || LOOP_PARAM.ETL_PARA_NAME || ' => ' || LOOP_PARAM.ETL_PARA_VAL || ',';
END LOOP;
-- 参数加上输出参数(存储过程运行结果和运行信息)
VAR_PARAMS := UPPER(VAR_PARAMS) || 'O_RESULT_FLAG => LO_RESULT_FLAG,O_RESULT_MSG => LO_RESULT_MSG';
-- 参数替换为变量
VAR_PARAMS := REPLACE(VAR_PARAMS,'#$I_DATA_START_TIME#',VAR_DATA_START_TIME);
VAR_PARAMS := REPLACE(VAR_PARAMS,'#$I_DATA_END_TIME#',VAR_DATA_END_TIME);
-- 拼接存储过程进行调用
VAR_SQL := 'DECLARE LO_RESULT_FLAG VARCHAR2(10);LO_RESULT_MSG VARCHAR2(300);BEGIN ' || VAR_PRO_NAME || '(' ||
VAR_PARAMS || ');PKG_ETL_CTL.SP_JOB_RUN_STATUS(' || I_JOB_ID || ',''' || VAR_ORG_ID ||
''',LO_RESULT_FLAG,LO_RESULT_MSG);END;';
-- 修改作业状态为正在运行
SP_JOB_RUN_STATUS(I_JOB_ID,VAR_ORG_ID,'1',VAR_JOB_RUN_DESC);
-- 运行存储过程
EXECUTE IMMEDIATE VAR_SQL;
EXCEPTION
WHEN OTHERS THEN
-- 运行出错,返回错误信息
SP_JOB_RUN_STATUS(I_JOB_ID,'2',SQLERRM);
END;
/*******************************************************************
程序名 :SP_FLOW_RUN_DEAL
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 创建新的时间周期,运行数据周期作业及作业流状态处理
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_FLOW_RUN_DEAL IS
BEGIN
-- 创建新的时间周期
SP_FLOW_CREATE_NEW_PERIOD;
-- 运行时间周期内的作业
SP_FLOW_RUN_NEW_PERIOD;
-- 更新作业流状态
SP_FLOW_RUN_STATUS;
-- 失败任务重置
SP_FLOW_RUN_ERROR_RESET;
END;
/*******************************************************************
程序名 :SP_FLOW_CREATE_NEW_PERIOD
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 生成新的数据周期运行新周期的数据
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_FLOW_CREATE_NEW_PERIOD IS
DTE_DATA_NEXT_TIME DATE;
DTE_DATA_START_TIME DATE;
DTE_DATA_END_TIME DATE;
BEGIN
FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,A.ETL_NEXT_EXPIRY_TIME
FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND T.ETL_FLOW_LEVEL = 1
AND T.ETL_FLOW_STATUS = 1) LOOP
DTE_DATA_NEXT_TIME := FN_GET_NEXT_DATA_TIME(LOOP_FLOW.ETL_FLOW_ID);
IF DTE_DATA_NEXT_TIME <= LOOP_FLOW.ETL_NEXT_EXPIRY_TIME
THEN
UPDATE ETL_FLOW_RUN_STS T
SET T.ETL_NEXT_DATA_TIME = DTE_DATA_NEXT_TIME
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
END IF;
END LOOP;
FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_DATA_SUCC_TIME,T.ETL_DATA_START_TIME,T.ETL_DATA_END_TIME,T.ETL_NEXT_DATA_TIME,T.ETL_FLOW_RUN_STATUS,A.ETL_CYC_CODE,A.ETL_FLOW_LEVEL,A.ETL_FLOW_STATUS
FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND A.ETL_FLOW_STATUS = 1
ORDER BY A.ETL_FLOW_LEVEL) LOOP
IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
AND LOOP_FLOW.ETL_FLOW_STATUS = 1
AND LOOP_FLOW.ETL_FLOW_RUN_STATUS = 9
AND LOOP_FLOW.ETL_DATA_SUCC_TIME < LOOP_FLOW.ETL_NEXT_DATA_TIME
THEN
UPDATE ETL_FLOW_RUN_STS T
SET T.ETL_DATA_START_TIME = CASE
WHEN LOOP_FLOW.ETL_CYC_CODE = '01' THEN
T.ETL_DATA_SUCC_TIME + 1
WHEN LOOP_FLOW.ETL_CYC_CODE = '02' THEN
T.ETL_DATA_SUCC_TIME + 1 / 24 / 60 / 60
WHEN LOOP_FLOW.ETL_CYC_CODE = '03' THEN
T.ETL_DATA_SUCC_TIME + 1
END,T.ETL_DATA_END_TIME = T.ETL_NEXT_DATA_TIME,T.ETL_START_TIME = NULL,T.ETL_END_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 0,T.ETL_RESET_TIME = 0
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
AND LOOP_FLOW.ETL_FLOW_STATUS = 1
AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 0
THEN
DTE_DATA_START_TIME := FN_GET_SUPER_DATA_START_TIME(LOOP_FLOW.ETL_FLOW_ID);
DTE_DATA_END_TIME := FN_GET_SUPER_DATA_END_TIME(LOOP_FLOW.ETL_FLOW_ID);
UPDATE ETL_FLOW_RUN_STS T
SET T.ETL_DATA_START_TIME = DTE_DATA_START_TIME,T.ETL_DATA_END_TIME = DTE_DATA_END_TIME,T.ETL_FLOW_RUN_STATUS = 0
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
END IF;
END LOOP;
FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID,T.ETL_DATA_START_TIME FLOW_DATA_START_TIME,T.ETL_DATA_END_TIME FLOW_DATA_END_TIME,A.ETL_DATA_START_TIME JOB_DATA_START_TIME,A.ETL_DATA_END_TIME JOB_DATA_END_TIME,B.ETL_JOB_STATUS
FROM ETL_FLOW_RUN_STS T,ETL_JOB_RUN_STS A,ETL_CTL_JOB_INFO B
WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID
AND A.ETL_JOB_ID = B.ETL_JOB_ID
AND B.ETL_JOB_STATUS = 1) LOOP
IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 0
AND LOOP_JOB.ETL_JOB_STATUS = 1
THEN
UPDATE ETL_JOB_RUN_STS T
SET T.ETL_DATA_START_TIME = LOOP_JOB.FLOW_DATA_START_TIME,T.ETL_DATA_END_TIME = LOOP_JOB.FLOW_DATA_END_TIME,T.ETL_JOB_RUN_STATUS = 0,T.ETL_SESSION_ID = NULL,T.ETL_LOG_DESC = NULL
WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID;
END IF;
END LOOP;
COMMIT;
END;
/*******************************************************************
程序名 :SP_FLOW_RUN_NEW_PERIOD
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 运行新周期的数据
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_FLOW_RUN_NEW_PERIOD IS
BEGIN
FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,ETL_CTL_JOB_FLOW A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND A.ETL_FLOW_STATUS = 1
ORDER BY A.ETL_FLOW_LEVEL) LOOP
IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0,3)
AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1
THEN
UPDATE ETL_FLOW_RUN_STS T
SET T.ETL_START_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 1
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 1
AND LOOP_FLOW.ETL_FLOW_RUN_STATUS IN (0,3)
AND FN_GET_FLOW_DEPEND(LOOP_FLOW.ETL_FLOW_ID) = 1
THEN
UPDATE ETL_FLOW_RUN_STS T
SET T.ETL_START_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 1
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
END IF;
END LOOP;
END;
/*******************************************************************
程序名 :SP_FLOW_RUN_STATUS
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 作业流状态处理
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_FLOW_RUN_STATUS IS
-- VAR_JOB_NO_SESSION VARCHAR2(100); -- 数据库进程不存在
VAR_JOB_OVERTIME_DESC VARCHAR2(100); -- 作业运行超时描述
INT_JOB_OVERTIME NUMBER; -- 作业超时告警时间
INT_LAST_RUNTIME NUMBER; -- 作业上次运行时间
-- INT_JOB_DEAD_TIME NUMBER; -- 调度作业数据库进程不存在运行判定时间
BEGIN
-- 获取数据库进程不存在描述
/*SELECT T.ETL_PARA_VAL
INTO VAR_JOB_NO_SESSION
FROM ETL_CTL_PARA T
WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_NO_SESSION_DESC';*/
-- 获取作业运行超时告警时间
SELECT T.ETL_PARA_VAL INTO INT_JOB_OVERTIME FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_OVERTIME';
-- 调度作业数据库进程不存在运行判定时间
/*SELECT T.ETL_PARA_VAL
INTO INT_JOB_DEAD_TIME
FROM ETL_CTL_PARA T
WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_DEAD_TIME';*/
-- 获取作业运行超时描述
SELECT T.ETL_PARA_VAL
INTO VAR_JOB_OVERTIME_DESC
FROM ETL_CTL_PARA T
WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_OVERTIME_DESC';
FOR LOOP_JOB IN (SELECT T.ETL_JOB_ID,A.ETL_OVERTIME_REM_WAY
--,T.ETL_SESSION_ID,(SYSDATE - T.ETL_START_TIME) RUNTIME
FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
AND T.ETL_JOB_RUN_STATUS = 1) LOOP
-- 作业为正在运行,但数据库进程已经不存在(10分钟)的作业置为运行失败
/*IF FN_GET_SESSION_STATUS(LOOP_JOB.ETL_SESSION_ID) = 0
AND LOOP_JOB.RUNTIME * 24 * 60 >= INT_JOB_DEAD_TIME
THEN
SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,'',VAR_JOB_NO_SESSION);
-- 超时提醒方式为超过上次运行时间
ELS*/
IF LOOP_JOB.ETL_OVERTIME_REM_WAY = 1
AND INT_JOB_OVERTIME > 0
THEN
BEGIN
SELECT RUNTIME
INTO INT_LAST_RUNTIME
FROM (SELECT T.ETL_LOGID,T.ETL_JOB_ID,(T.ETL_END_TIME - T.ETL_START_TIME) * 24 * 60 RUNTIME,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM
FROM ETL_JOB_RUN_LOG T
WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
AND T.ETL_JOB_RUN_STATUS = 9)
WHERE ROW_NUM = 1;
IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME
THEN
-- 修改作业状态为运行超时
SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,4,VAR_JOB_OVERTIME_DESC);
END IF;
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
-- 超时提醒方式为超过前三次的平均值
ELSIF LOOP_JOB.ETL_OVERTIME_REM_WAY = 2
AND INT_JOB_OVERTIME > 0
THEN
BEGIN
SELECT AVG(RUNTIME)
INTO INT_LAST_RUNTIME
FROM (SELECT T.ETL_LOGID,ROW_NUMBER() OVER(ORDER BY T.ETL_LOGID DESC) ROW_NUM
FROM ETL_JOB_RUN_LOG T
WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
AND T.ETL_JOB_RUN_STATUS = 9)
WHERE ROW_NUM <= 3;
IF LOOP_JOB.RUNTIME - INT_LAST_RUNTIME > INT_JOB_OVERTIME
THEN
-- 修改作业状态为运行超时
SP_JOB_RUN_STATUS(LOOP_JOB.ETL_JOB_ID,VAR_JOB_OVERTIME_DESC);
END IF;
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
END IF;
END LOOP;
FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_DATA_END_TIME
FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND T.ETL_FLOW_RUN_STATUS = 1
ORDER BY A.ETL_FLOW_LEVEL DESC) LOOP
-- 下属作业运行成功将作业流状态置为成功
IF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 9
THEN
UPDATE ETL_FLOW_RUN_STS T
SET T.ETL_DATA_SUCC_TIME = LOOP_FLOW.ETL_DATA_END_TIME,T.ETL_END_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 9
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID);
-- 下属作业运行失败将作业流置为失败
ELSIF FN_GET_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 2
THEN
UPDATE ETL_FLOW_RUN_STS T
SET T.ETL_END_TIME = SYSDATE,T.ETL_FLOW_RUN_STATUS = 2
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
SP_ETL_FLOW_LOG_INFO(LOOP_FLOW.ETL_FLOW_ID);
END IF;
END LOOP;
END;
/*******************************************************************
程序名 :SP_FLOW_RUN_ERROR_RESET
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 失败任务重置,等待重新运行
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_FLOW_RUN_ERROR_RESET IS
VAR_MAX_RESET_TIME NUMBER;
BEGIN
-- 获取最大任务重置次数
SELECT T.ETL_PARA_VAL
INTO VAR_MAX_RESET_TIME
FROM ETL_CTL_PARA T
WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_MAX_RESET_TIME';
FOR LOOP_FLOW IN (SELECT T.ETL_FLOW_ID,T.ETL_RESET_TIME
FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND A.ETL_FLOW_STATUS = 1
AND T.ETL_FLOW_RUN_STATUS = 2
ORDER BY A.ETL_FLOW_LEVEL) LOOP
IF LOOP_FLOW.ETL_FLOW_LEVEL = 1
AND (VAR_MAX_RESET_TIME = 0 OR LOOP_FLOW.ETL_RESET_TIME < VAR_MAX_RESET_TIME)
THEN
UPDATE ETL_FLOW_RUN_STS T
SET /*T.ETL_START_TIME = NULL,*/ T.ETL_END_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 3,T.ETL_RESET_TIME = T.ETL_RESET_TIME + 1
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
ELSIF LOOP_FLOW.ETL_FLOW_LEVEL > 1
AND FN_GET_SUPER_FLOW_RUN_STATUS(LOOP_FLOW.ETL_FLOW_ID) = 3
THEN
UPDATE ETL_FLOW_RUN_STS T
SET /*T.ETL_START_TIME = NULL,T.ETL_FLOW_RUN_STATUS = 3
WHERE T.ETL_FLOW_ID = LOOP_FLOW.ETL_FLOW_ID;
COMMIT;
END IF;
END LOOP;
FOR LOOP_JOB IN (SELECT A.ETL_JOB_ID,A.ETL_JOB_RUN_STATUS
FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_INFO B
WHERE T.ETL_FLOW_ID = B.ETL_FLOW_ID
AND A.ETL_JOB_ID = B.ETL_JOB_ID
AND B.ETL_JOB_STATUS = 1) LOOP
-- 将运行失败的作业置为重新运行,等待重新运行
IF LOOP_JOB.ETL_FLOW_RUN_STATUS = 3
AND LOOP_JOB.ETL_JOB_RUN_STATUS = 2
THEN
UPDATE ETL_JOB_RUN_STS T
SET /*T.ETL_START_TIME = NULL,T.ETL_JOB_RUN_STATUS = 3,T.ETL_LOG_DESC = NULL
WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID;
END IF;
END LOOP;
COMMIT;
END;
/*******************************************************************
程序名 :SP_JOB_RUN_STATUS
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 作业运行状态处理
修改人 : zhuyh
修改时间 :2013/9/30
修改原因 : 进程ID由记录数据库进程改为记录操作系统进程
*******************************************************************/
PROCEDURE SP_JOB_RUN_STATUS
(
I_JOB_ID NUMBER,I_JOB_RUN_INFO VARCHAR2
) IS
-- VAR_SESSION_ID VARCHAR2(10);
DTE_DATA_END_TIME DATE;
VAR_JOB_SUCC_DESC VARCHAR2(100);
VAR_JOB_ERR_DESC VARCHAR2(100);
BEGIN
-- 获取作业运行成功描述
SELECT T.ETL_PARA_VAL
INTO VAR_JOB_SUCC_DESC
FROM ETL_CTL_PARA T
WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_SUCC_DESC';
-- 获取作业运行失败描述
SELECT T.ETL_PARA_VAL INTO VAR_JOB_ERR_DESC FROM ETL_CTL_PARA T WHERE UPPER(T.ETL_PARA_NAME) = 'ETL_JOB_ERR_DESC';
SELECT ETL_DATA_END_TIME INTO DTE_DATA_END_TIME FROM ETL_JOB_RUN_STS T WHERE T.ETL_JOB_ID = I_JOB_ID;
-- 正在运行
IF I_JOB_RUN_STATUS = 1
THEN
-- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程
-- 获取数据库进程
-- SELECT SYS_CONTEXT('USERENV','SID') INTO VAR_SESSION_ID FROM DUAL;
UPDATE ETL_JOB_RUN_STS T
SET /*T.ETL_START_TIME = SYSDATE,*/ T.ETL_DATA_ORG_ID = I_ORG_ID,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS
-- del by zhuyh 2013/9/30 进程ID由记录数据库进程改为记录操作系统进程
--,T.ETL_SESSION_ID = VAR_SESSION_ID,T.ETL_LOG_DESC = I_JOB_RUN_INFO
WHERE T.ETL_JOB_ID = I_JOB_ID;
COMMIT;
-- 运行失败
ELSIF I_JOB_RUN_STATUS = 2
THEN
UPDATE ETL_JOB_RUN_STS T
SET T.ETL_END_TIME = SYSDATE,T.ETL_JOB_RUN_STATUS = I_JOB_RUN_STATUS,T.ETL_LOG_DESC = VAR_JOB_ERR_DESC || I_JOB_RUN_INFO
WHERE T.ETL_JOB_ID = I_JOB_ID;
COMMIT;
-- 写失败日志
SP_ETL_JOB_LOG_INFO(I_JOB_ID);
-- 运行成功
ELSIF I_JOB_RUN_STATUS = 9
THEN
UPDATE ETL_JOB_RUN_STS T
SET T.ETL_DATA_SUCC_TIME = DTE_DATA_END_TIME,T.ETL_LOG_DESC = VAR_JOB_SUCC_DESC || I_JOB_RUN_INFO
WHERE T.ETL_JOB_ID = I_JOB_ID;
COMMIT;
-- 写失败日志
SP_ETL_JOB_LOG_INFO(I_JOB_ID);
END IF;
END;
/*******************************************************************
程序名 :SP_ETL_FLOW_LOG_INFO
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 记录作业流运行日志
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_ETL_FLOW_LOG_INFO(I_FLOW_ID NUMBER) IS
DTE_DATA_START_TIME DATE;
DTE_DATA_END_TIME DATE;
INT_COUNT NUMBER;
VAR_ETL_LOGID VARCHAR2(30); -- 日志序号在每个数据周期内排序
BEGIN
SELECT T.ETL_DATA_START_TIME,T.ETL_DATA_END_TIME
INTO DTE_DATA_START_TIME,DTE_DATA_END_TIME
FROM ETL_FLOW_RUN_STS T
WHERE T.ETL_FLOW_ID = I_FLOW_ID;
-- 检查该数据周期有没有运行过
SELECT COUNT(*)
INTO INT_COUNT
FROM ETL_FLOW_RUN_LOG T
WHERE T.ETL_FLOW_ID = I_FLOW_ID
AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
-- 未运行过的使用数据开始时间从新编号
IF INT_COUNT = 0
THEN
VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME,'YYYYMMDDHH24MISS') ||
TO_CHAR(DTE_DATA_END_TIME,'YYYYMMDDHH24MISS') || '01';
ELSE
-- 运行过的用最大编号加1
SELECT MAX(ETL_LOGID)
INTO VAR_ETL_LOGID
FROM ETL_FLOW_RUN_LOG T
WHERE T.ETL_FLOW_ID = I_FLOW_ID
AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
VAR_ETL_LOGID := VAR_ETL_LOGID + 1;
END IF;
-- 记日志表
INSERT INTO ETL_FLOW_RUN_LOG
(ETL_LOGID,ETL_FLOW_ID,ETL_DATA_START_TIME,ETL_DATA_END_TIME,ETL_START_TIME,ETL_END_TIME,ETL_FLOW_RUN_STATUS,ETL_LOG_DESC)
SELECT VAR_ETL_LOGID,T.ETL_FLOW_ID,T.ETL_START_TIME,T.ETL_END_TIME,T.ETL_LOG_DESC
FROM ETL_FLOW_RUN_STS T
WHERE T.ETL_FLOW_ID = I_FLOW_ID;
COMMIT;
END;
/*******************************************************************
程序名 :SP_ETL_JOB_LOG_INFO
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 记录作业运行日志
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_ETL_JOB_LOG_INFO(I_JOB_ID NUMBER) IS
DTE_DATA_START_TIME DATE;
DTE_DATA_END_TIME DATE;
INT_COUNT NUMBER;
VAR_ETL_LOGID VARCHAR2(30); -- 日志序号在每个数据周期内排序
BEGIN
SELECT T.ETL_DATA_START_TIME,DTE_DATA_END_TIME
FROM ETL_JOB_RUN_STS T
WHERE T.ETL_JOB_ID = I_JOB_ID;
-- 检测该数据周期任务有没有运行过
SELECT COUNT(*)
INTO INT_COUNT
FROM ETL_JOB_RUN_LOG T
WHERE T.ETL_JOB_ID = I_JOB_ID
AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
-- 未运行过的作业使用数据结束时间重新编号
IF INT_COUNT = 0
THEN
VAR_ETL_LOGID := TO_CHAR(DTE_DATA_START_TIME,'YYYYMMDDHH24MISS') || '01';
ELSE
-- 运行过的作业用最大编号加1
SELECT MAX(ETL_LOGID)
INTO VAR_ETL_LOGID
FROM ETL_JOB_RUN_LOG T
WHERE T.ETL_JOB_ID = I_JOB_ID
AND T.ETL_DATA_START_TIME = DTE_DATA_START_TIME
AND T.ETL_DATA_END_TIME = DTE_DATA_END_TIME;
VAR_ETL_LOGID := VAR_ETL_LOGID + 1;
END IF;
-- 记日志表
INSERT INTO ETL_JOB_RUN_LOG
(ETL_LOGID,ETL_JOB_ID,ETL_DATA_ORG_ID,ETL_JOB_RUN_STATUS,T.ETL_DATA_ORG_ID,T.ETL_JOB_RUN_STATUS,T.ETL_LOG_DESC
FROM ETL_JOB_RUN_STS T
WHERE T.ETL_JOB_ID = I_JOB_ID;
COMMIT;
END;
/*******************************************************************
程序名 :SP_INSERT_JOB_FAIL_SMS
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 生成失败作业短信
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_INSERT_MONITOR_SMS
(
O_RESULT_FLAG OUT VARCHAR2 /*过程执行结果返回给调度 9 成功 2 失败*/,O_RESULT_MSG OUT VARCHAR2 /*过程执行结果信息返回给调度*/
) IS
V_DTE_RUN_BEGIN_DT DATE; /*程序每一步骤运行开始*/
V_DTE_RUN_END_DT DATE; /*程序每一步骤运行结束时间*/
V_INT_STEP NUMBER := 0; /*程序执行步骤*/
/*步骤描述信息*/
V_VAR_STEP_DESC VARCHAR2(1000);
/*步骤所执行的DML类型*/
V_VAR_STEP_DML_TYPE VARCHAR2(10);
/*受影响行数*/
V_INT_ROW_CNT INTEGER := 0;
/*过程名称*/
V_VAR_PROC_NAME VARCHAR2(70) := 'PKG_SEND_SMS.SP_INSERT_SMS';
BEGIN
V_INT_STEP := V_INT_STEP + 1; /*第一步骤*/
V_VAR_STEP_DESC := V_INT_STEP || '.0:作业运行失败发送短信给运营人员 ';
V_VAR_STEP_DML_TYPE := 'INSERT'; /*操作类型*/
/*DML开始运行时间*/
V_DTE_RUN_BEGIN_DT := SYSDATE;
/*执行相应的SQL语句*/
FOR LOOP_JOB IN (SELECT C.MOBLIE_PHONE,T.ETL_LOGID,A.ETL_JOB_ID,A.ETL_JOB_NAME,A.ETL_JOB_DESC,T.ETL_LOG_DESC
FROM ETL_JOB_RUN_LOG T,ETL_CTL_JOB_INFO A,ETL_SEND_SMS_LIST C
WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
AND T.ETL_JOB_RUN_STATUS = 2
AND T.ETL_SEND_FLAG = 0
AND C.SEND_MONITOR_SMS = 1) LOOP
INSERT INTO ETL_SEND_SMS_LOG
(ETL_LOGID,MOBLIE_PHONE,SMS_CONTENT,SMS_LEVEL,SEND_TIME,ETL_DATE)
VALUES
(LOOP_JOB.ETL_LOGID,LOOP_JOB.ETL_JOB_ID,LOOP_JOB.MOBLIE_PHONE,'作业ID号[' || LOOP_JOB.ETL_JOB_ID || '],作业名称[' || LOOP_JOB.ETL_JOB_NAME || '],作业描述[' || LOOP_JOB.ETL_JOB_DESC ||
'],数据周期[' || TO_CHAR(LOOP_JOB.ETL_DATA_START_TIME,'YYYYMMDDHH24MISS') || '-' ||
TO_CHAR(LOOP_JOB.ETL_DATA_END_TIME,'YYYYMMDDHH24MISS') || '],机构号[' || LOOP_JOB.ETL_DATA_ORG_ID || '],运行描述[' ||
LOOP_JOB.ETL_LOG_DESC || ']',1,TRUNC(SYSDATE),SYSDATE);
/*获取受影响行数*/
V_INT_ROW_CNT := V_INT_ROW_CNT + 1;
UPDATE ETL_JOB_RUN_LOG T
SET T.ETL_SEND_FLAG = 1
WHERE T.ETL_JOB_ID = LOOP_JOB.ETL_JOB_ID
AND T.ETL_LOGID = LOOP_JOB.ETL_LOGID;
COMMIT; ---提交DML操作
END LOOP;
/*DML运行结束时间*/
V_DTE_RUN_END_DT := SYSDATE;
/*记录成功的日志信息*/
IF V_INT_ROW_CNT > 0
THEN
PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP,SYSDATE /*数据开始日期*/,SYSDATE /*数据结束日期*/,1 /*机构代码*/,V_VAR_PROC_NAME /*存储过程名称*/,V_VAR_STEP_DESC /*操作步骤描述*/,V_VAR_STEP_DML_TYPE /*操作类型*/,V_INT_ROW_CNT /*受影响行数*/,1 /*执行结果*/,V_DTE_RUN_BEGIN_DT /*运行开始时间*/,V_DTE_RUN_END_DT /*运行结束时间*/,'' /*运行结果详细信息*/);
END IF;
/*整个过程执行成功*/
O_RESULT_FLAG := 9;
/*整个过程运行结果描述信息*/
O_RESULT_MSG := '';
/*异常处理部分*/
EXCEPTION
WHEN OTHERS THEN
---回滚DML操作
ROLLBACK;
O_RESULT_FLAG := 2; ----失败
O_RESULT_MSG := SQLERRM;
---记录异常日志信息
PKG_PUBLIC.SP_ETL_LOAD_DML_LOG(V_INT_STEP,V_VAR_STEP_DML_TYPE /*操作步骤类型*/,V_INT_ROW_CNT /*返回的受影响行数*/,0 /*运行结果 0 失败; 1 成功*/,SQLERRM /*运行结果详细信息*/);
END;
/*******************************************************************
程序名 :SP_SEND_JOB_FAIL_SMS
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 发送失败作业短信
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
PROCEDURE SP_SEND_MONITOR_SMS IS
V_VAR_RESULT_FLAG CHAR(1);
V_VAR_RESULT_MSG CHAR(300);
V_VAR_RETURN_STATUS INT;
CURSOR C_DATA IS
SELECT T.ETL_LOGID,T.MOBLIE_PHONE,T.SMS_CONTENT,T.SMS_LEVEL,T.SEND_TIME
FROM ETL_SEND_SMS_LOG T
WHERE T.SEND_STATUS = 0;
V_USER_CODE VARCHAR2(100) DEFAULT 'MIS';
V_PASSWORD VARCHAR2(100) DEFAULT 'MIS#2013';
BEGIN
SP_INSERT_MONITOR_SMS(V_VAR_RESULT_FLAG,V_VAR_RESULT_MSG);
IF V_VAR_RESULT_FLAG = 9
THEN
FOR CC_DATA IN C_DATA LOOP
PKG_SMS_INTERFACE.SEND_SMS(V_USER_CODE,V_PASSWORD,CC_DATA.MOBLIE_PHONE,CC_DATA.SMS_CONTENT,CC_DATA.SMS_LEVEL,CC_DATA.SEND_TIME,V_VAR_RETURN_STATUS);
--发送成功,更新标志
IF V_VAR_RETURN_STATUS > 0
THEN
UPDATE ETL_SEND_SMS_LOG T
SET T.SEND_STATUS = '1',T.RECEIVE_STATUS = V_VAR_RETURN_STATUS
WHERE T.ETL_LOGID = CC_DATA.ETL_LOGID
AND T.ETL_JOB_ID = CC_DATA.ETL_JOB_ID
AND T.MOBLIE_PHONE = CC_DATA.MOBLIE_PHONE;
END IF;
END LOOP;
END IF;
COMMIT;
END;
/*******************************************************************
程序名 :FN_GET_FLOW_DEPEND
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 获取作业流前置依赖
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
FUNCTION FN_GET_FLOW_DEPEND(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
VAR_DEP_STS VARCHAR2(100) := 1;
BEGIN
FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME,NVL(B.ETL_DATA_SUCC_TIME,DATE '1900-1-1') ETL_DEP_SUCC_TIME,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_FLOW_ID) ETL_CYC_CODE,PKG_ETL_CTL.FN_GET_CYC_CODE(T.ETL_DEPD_FLOW_ID) DEP_CYC_CODE
FROM ETL_CTL_FLOW_DEPD T,ETL_FLOW_RUN_STS A,ETL_FLOW_RUN_STS B
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND T.ETL_DEPD_FLOW_ID = B.ETL_FLOW_ID
AND T.ETL_FLOW_ID = I_FLOW_ID) LOOP
-- 数据结束时间大于所依赖的数据成功时间
IF (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.ETL_CYC_CODE = LOOP_DEP.DEP_CYC_CODE)
OR (LOOP_DEP.ETL_DATA_END_TIME >= TRUNC(LOOP_DEP.ETL_DEP_SUCC_TIME) AND LOOP_DEP.ETL_CYC_CODE <> '02' AND
LOOP_DEP.DEP_CYC_CODE = '02')
OR (LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME AND LOOP_DEP.DEP_CYC_CODE <> '02')
THEN
VAR_DEP_STS := 0;
RETURN VAR_DEP_STS;
END IF;
END LOOP;
RETURN VAR_DEP_STS;
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END;
/*******************************************************************
程序名 :FN_GET_JOB_DEPEND
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 获取作业前置依赖
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
FUNCTION FN_GET_JOB_DEPEND(I_JOB_ID NUMBER) RETURN VARCHAR2 IS
VAR_DEP_STS VARCHAR2(100) := 1;
BEGIN
FOR LOOP_DEP IN (SELECT A.ETL_DATA_END_TIME,DATE '1900-1-1') ETL_DEP_SUCC_TIME
FROM ETL_CTL_JOB_DEPD T,ETL_JOB_RUN_STS B
WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
AND T.ETL_DEPD_JOB_ID = B.ETL_JOB_ID
AND T.ETL_JOB_ID = I_JOB_ID) LOOP
-- 数据结束时间大于所依赖的数据成功时间
IF LOOP_DEP.ETL_DATA_END_TIME > LOOP_DEP.ETL_DEP_SUCC_TIME
THEN
VAR_DEP_STS := 0;
RETURN VAR_DEP_STS;
END IF;
END LOOP;
RETURN VAR_DEP_STS;
END;
/*******************************************************************
程序名 :FN_GET_FLOW_RUN_STATUS
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 获取作业流运行状态
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
FUNCTION FN_GET_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
VAR_FLOW_RUN_STATUS VARCHAR2(100) := 1;
VAR_CHILD_FLAG CHAR(1);
INT_STS_NOT_9 NUMBER;
INT_STS_0_3 NUMBER;
INT_STS_1_4 NUMBER;
INT_STS_2 NUMBER;
BEGIN
SELECT T.ETL_CHILD_FLAG INTO VAR_CHILD_FLAG FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID;
-- 不成功的数量
IF VAR_CHILD_FLAG = 0
THEN
SELECT COUNT(*)
INTO INT_STS_NOT_9
FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND A.ETL_FLOW_STATUS = 1
AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
AND ETL_FLOW_RUN_STATUS <> 9;
-- 正在运行的数量
SELECT COUNT(*)
INTO INT_STS_1_4
FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND A.ETL_FLOW_STATUS = 1
AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
AND ETL_FLOW_RUN_STATUS IN (1,4);
-- 满足条件但未运行的数量
SELECT COUNT(*)
INTO INT_STS_0_3
FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND A.ETL_FLOW_STATUS = 1
AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
AND ETL_FLOW_RUN_STATUS IN (0,3)
AND FN_GET_FLOW_DEPEND(T.ETL_FLOW_ID) = 1;
-- 运行失败的数量
SELECT COUNT(*)
INTO INT_STS_2
FROM ETL_FLOW_RUN_STS T,ETL_CTL_JOB_FLOW A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND A.ETL_FLOW_STATUS = 1
AND A.ETL_SUPER_FLOW_ID = I_FLOW_ID
AND ETL_FLOW_RUN_STATUS = 2;
ELSIF VAR_CHILD_FLAG = 1
THEN
SELECT COUNT(*)
INTO INT_STS_NOT_9
FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
AND A.ETL_JOB_STATUS = 1
AND A.ETL_FLOW_ID = I_FLOW_ID
AND ETL_JOB_RUN_STATUS <> 9;
-- 正在运行的数量
SELECT COUNT(*)
INTO INT_STS_1_4
FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
AND A.ETL_JOB_STATUS = 1
AND A.ETL_FLOW_ID = I_FLOW_ID
AND ETL_JOB_RUN_STATUS IN (1,4);
-- 满足条件但未运行的数量
SELECT COUNT(*)
INTO INT_STS_0_3
FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
AND A.ETL_JOB_STATUS = 1
AND A.ETL_FLOW_ID = I_FLOW_ID
AND ETL_JOB_RUN_STATUS IN (0,3)
AND FN_GET_FLOW_DEPEND(T.ETL_JOB_ID) = 1;
-- 运行失败的数量
SELECT COUNT(*)
INTO INT_STS_2
FROM ETL_JOB_RUN_STS T,ETL_CTL_JOB_INFO A
WHERE T.ETL_JOB_ID = A.ETL_JOB_ID
AND A.ETL_JOB_STATUS = 1
AND A.ETL_FLOW_ID = I_FLOW_ID
AND ETL_JOB_RUN_STATUS = 2;
END IF;
-- 不成功的数量为0,则全部成功
IF INT_STS_NOT_9 = 0
THEN
VAR_FLOW_RUN_STATUS := 9;
-- 不成功的不为0,正在运行的为0,运行失败的数量大于0
ELSIF INT_STS_1_4 = 0
AND INT_STS_0_3 = 0
AND INT_STS_2 > 0
THEN
VAR_FLOW_RUN_STATUS := 2;
END IF;
RETURN VAR_FLOW_RUN_STATUS;
END;
/*******************************************************************
程序名 :FN_GET_NEXT_DATA_TIME
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 获取下个数据时间
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
FUNCTION FN_GET_NEXT_DATA_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
VAR_CTL_CYC_CODE VARCHAR2(2);
VAR_SQL VARCHAR2(1000);
VAR_FREQ_TIME VARCHAR2(100);
VAR_SRC_DB VARCHAR2(100);
DTE_SRC_SYS_TIME DATE;
DTE_NEXT_DATA_TIME DATE;
DTE_DATA_SUCC_TIME DATE;
BEGIN
-- 获取运行周期、源数据库,下次运行时间、数据成功时间
SELECT T.ETL_CYC_CODE,T.ETL_SRC_DB,NVL(A.ETL_NEXT_DATA_TIME,A.ETL_DATA_END_TIME),NVL(A.ETL_DATA_SUCC_TIME,A.ETL_DATA_END_TIME)
INTO VAR_CTL_CYC_CODE,VAR_SRC_DB,DTE_NEXT_DATA_TIME,DTE_DATA_SUCC_TIME
FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
WHERE T.ETL_FLOW_ID = A.ETL_FLOW_ID
AND T.ETL_FLOW_ID = I_FLOW_ID;
-- 非本数据库
VAR_SRC_DB := VAR_SRC_DB || 'dual';
-- 每天运行的作业流
IF VAR_CTL_CYC_CODE = '01'
THEN
VAR_FREQ_TIME := 1;
VAR_SQL := 'SELECT TRUNC(SYSDATE) FROM ' || VAR_SRC_DB;
EXECUTE IMMEDIATE VAR_SQL
INTO DTE_SRC_SYS_TIME;
DTE_NEXT_DATA_TIME := DTE_DATA_SUCC_TIME + VAR_FREQ_TIME;
IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME
THEN
RETURN DTE_NEXT_DATA_TIME;
ELSE
RETURN DTE_DATA_SUCC_TIME;
END IF;
-- 每十分钟运行的作业流
ELSIF VAR_CTL_CYC_CODE = '02'
THEN
VAR_FREQ_TIME := 1 / 24 / 6;
VAR_SQL := 'SELECT SYSDATE - 1 / 24 / 6 FROM ' || VAR_SRC_DB;
EXECUTE IMMEDIATE VAR_SQL
INTO DTE_SRC_SYS_TIME;
WHILE DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME LOOP
DTE_NEXT_DATA_TIME := DTE_NEXT_DATA_TIME + VAR_FREQ_TIME;
END LOOP;
RETURN DTE_NEXT_DATA_TIME;
ELSIF VAR_CTL_CYC_CODE = '03'
THEN
VAR_FREQ_TIME := 1;
VAR_SQL := 'SELECT TRUNC(SYSDATE) FROM ' || VAR_SRC_DB;
EXECUTE IMMEDIATE VAR_SQL
INTO DTE_SRC_SYS_TIME;
DTE_NEXT_DATA_TIME := ADD_MONTHS(DTE_DATA_SUCC_TIME,VAR_FREQ_TIME);
IF DTE_NEXT_DATA_TIME < DTE_SRC_SYS_TIME
THEN
RETURN DTE_NEXT_DATA_TIME;
ELSE
RETURN DTE_DATA_SUCC_TIME;
END IF;
END IF;
END;
/*******************************************************************
程序名 :FN_GET_SUPER_FLOW_RUN_STATUS
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 获取上级作业流运行状态
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
FUNCTION FN_GET_SUPER_FLOW_RUN_STATUS(I_FLOW_ID NUMBER) RETURN VARCHAR2 IS
VAR_SUPER_FLOW_RUN_STATUS VARCHAR2(10);
BEGIN
SELECT ETL_FLOW_RUN_STATUS
INTO VAR_SUPER_FLOW_RUN_STATUS
FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
AND T.ETL_FLOW_ID = I_FLOW_ID;
RETURN VAR_SUPER_FLOW_RUN_STATUS;
END;
/*******************************************************************
程序名 :FN_GET_SUPER_DATA_START_TIME
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 获取上级作业流数据开始时间
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
FUNCTION FN_GET_SUPER_DATA_START_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
VAR_SUPER_FLOW_DATA_START_TIME DATE;
BEGIN
SELECT A.ETL_DATA_START_TIME
INTO VAR_SUPER_FLOW_DATA_START_TIME
FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
AND T.ETL_FLOW_ID = I_FLOW_ID;
RETURN VAR_SUPER_FLOW_DATA_START_TIME;
END;
/*******************************************************************
程序名 :FN_GET_SUPER_DATA_END_TIME
创建人 : zhuyh
创建时间 : 2013/8/20
功能描述 : 获取上级作业流数据结束时间
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
FUNCTION FN_GET_SUPER_DATA_END_TIME(I_FLOW_ID NUMBER) RETURN DATE IS
VAR_SUPER_FLOW_DATA_END_TIME DATE;
BEGIN
SELECT A.ETL_DATA_END_TIME
INTO VAR_SUPER_FLOW_DATA_END_TIME
FROM ETL_CTL_JOB_FLOW T,ETL_FLOW_RUN_STS A
WHERE T.ETL_SUPER_FLOW_ID = A.ETL_FLOW_ID
AND T.ETL_FLOW_ID = I_FLOW_ID;
RETURN VAR_SUPER_FLOW_DATA_END_TIME;
END;
/*******************************************************************
程序名 :FN_GET_CYC_CODE
创建人 : zhuyh
创建时间 : 2013/8/26
功能描述 : 获取周期代码
修改人 :
修改时间 :
修改原因 :
*******************************************************************/
FUNCTION FN_GET_CYC_CODE(I_FLOW_ID VARCHAR2) RETURN VARCHAR2 IS
VAR_CYC_CODE VARCHAR2(100);
BEGIN
SELECT T.ETL_CYC_CODE INTO VAR_CYC_CODE FROM ETL_CTL_JOB_FLOW T WHERE T.ETL_FLOW_ID = I_FLOW_ID;
RETURN VAR_CYC_CODE;
END;
END PKG_ETL_CTL;
/
以上内容由PHP站长网【52php.cn】收集整理供大家参考研究 如果以上内容对您有帮助,欢迎收藏、点赞、推荐、分享。 (编辑:好新闻门户网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |

