1. 首页
  2. 技术知识

抽取oracle数据到mysql数据库的实现过程

在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:

1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql

2、建立一个目录ETL_DIR

3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql

4、导入mysql数据,文件内容如下

load data infile “alarm_hist_inc.csv” into table alarm_hist_inc fields terminated by “,” enclosed by “^” lines terminated by “\r\n”;

load data infile “button_authority.csv” into table button_authority fields terminated by “,” enclosed by “^” lines terminated by “\r\n”;

load data infile “c3_sms_hist_inc.csv” into table c3_sms_hist_inc fields terminated by “,” enclosed by “^” lines terminated by “\r\n”;

load data infile “datapermisson.csv” into table datapermisson fields terminated by “,” enclosed by “^” lines terminated by “\r\n”;
附:数据库脚本P_ETL_ORA_DATA

CREATE OR REPLACE PROCEDURE P_ETL_ORA_DATA

(

  P_ORA_DIR  VARCHAR2,

  P_DATA_PATH VARCHAR2

) IS

  TYPE T_REC IS RECORD(

    TBN VARCHAR2(40),

    WHR VARCHAR2(4000));

  TYPE T_TABS IS TABLE OF T_REC;

  V_TABS   T_TABS := T_TABS();

  V_ETL_DIR  VARCHAR2(40) := P_ORA_DIR;

  V_LOAD_FILE UTL_FILE.FILE_TYPE;

  PROCEDURE ETL_DATA

  (

    P_SQL_STMT VARCHAR2,

    P_DATA_PATH VARCHAR2,

    P_TB_NAME  VARCHAR2

  ) IS

  BEGIN

    DECLARE

      V_VAR_COL  VARCHAR2(32767);

      V_NUM_COL  NUMBER;

      V_DATE_COL DATE;

      V_TMZ    TIMESТAMP;

      V_COLS   NUMBER;

      V_COLS_DESC DBMS_SQL.DESC_TAB;

      V_ROW_STR  VARCHAR2(32767);

      V_COL_STR  VARCHAR2(32767);

      V_SQL_ID  NUMBER;

      V_SQL_REF  SYS_REFCURSOR;

      V_EXP_FILE UTL_FILE.FILE_TYPE;

      V_DATA_PATH VARCHAR2(200);

    BEGIN

      V_DATA_PATH := P_DATA_PATH;

      IF REGEXP_SUBSTR(V_DATA_PATH, ‘\\$’) IS NULL

      THEN

        V_DATA_PATH := V_DATA_PATH || ‘\’;

      END IF;

      V_DATA_PATH := REPLACE(V_DATA_PATH, ‘\’, ‘\\’);

      OPEN V_SQL_REF FOR P_SQL_STMT;

      V_SQL_ID := DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF);

      DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID, V_COLS, V_COLS_DESC);

      FOR I IN V_COLS_DESC.FIRST .. V_COLS_DESC.LAST

      LOOP

        CASE

          WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN

            DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_VAR_COL, 32767);

          WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN

            DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_NUM_COL);

          WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN

            DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_DATE_COL);

          WHEN V_COLS_DESC(I).COL_TYPE = 180 THEN

            DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_TMZ);

        END CASE;

      END LOOP;

      DECLARE

        V_FLUSH_OVER PLS_INTEGER := 1;

        V_FILE_OVER PLS_INTEGER := 1;

        V_FILE_NO  PLS_INTEGER := 1;

        V_FILE_NAME VARCHAR2(200);

        V_LINE    VARCHAR2(400);

      BEGIN

        WHILE DBMS_SQL.FETCH_ROWS(V_SQL_ID) > 0

        LOOP

          IF V_FILE_OVER = 1

          THEN

            V_FILE_NAME := P_TB_NAME || ‘_’ || V_FILE_NO || ‘.csv’;

            V_EXP_FILE := UTL_FILE.FOPEN(V_ETL_DIR, V_FILE_NAME, OPEN_MODE => ‘w’, MAX_LINESIZE => 32767);

          END IF;

          V_ROW_STR := ”;

          FOR I IN 1 .. V_COLS

          LOOP

            V_COL_STR := ‘\N’;

            BEGIN

              CASE

                WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN

                  DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_VAR_COL);

                  IF V_VAR_COL IS NOT NULL

                  THEN

                    V_COL_STR := ‘^’ || V_VAR_COL || ‘^’;

                  END IF;

                WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN

                  DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_NUM_COL);

                  IF V_NUM_COL IS NOT NULL

                  THEN

                    V_COL_STR := V_NUM_COL;

                  END IF;

                WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN

                  DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_DATE_COL);

                  IF V_DATE_COL IS NOT NULL

                  THEN

                    V_COL_STR := ‘^’ || TO_CHAR(V_DATE_COL, ‘yyyy-mm-dd hh24:mi:ss’) || ‘^’;

                  END IF;

                WHEN V_COLS_DESC(I).COL_TYPE IN (180, 181, 231) THEN

                  DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_TMZ);

                  IF V_TMZ IS NOT NULL

                  THEN

                    V_COL_STR := ‘^’ || TO_CHAR(V_TMZ, ‘yyyy-mm-dd hh24:mi:ss.ff6’) || ‘^’;

                  END IF;

              END CASE;

              IF I = 1

              THEN

                V_ROW_STR := V_COL_STR;

              ELSE

                V_ROW_STR := V_ROW_STR || ‘,’ || V_COL_STR;

              END IF;

            END;

          END LOOP;

          UTL_FILE.PUT_LINE(V_EXP_FILE, CONVERT(V_ROW_STR, ‘UTF8’));

          IF V_FILE_OVER > 200000 /*每200000条记录就产生一个新的文件*/

          THEN

            V_FILE_OVER := 1;

            V_FLUSH_OVER := 1;

            V_FILE_NO  := V_FILE_NO + 1;

            UTL_FILE.FCLOSE(V_EXP_FILE);

            V_LINE := ‘load data infile “‘ || V_DATA_PATH || V_FILE_NAME || ‘” into table ‘ || P_TB_NAME;

            V_LINE := V_LINE || ‘ fields terminated by “,” enclosed by “^” lines terminated by “\r\n”;’;

            UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);

            UTL_FILE.FFLUSH(V_LOAD_FILE);

            CONTINUE;

          END IF;

          V_FILE_OVER := V_FILE_OVER + 1;

          IF V_FLUSH_OVER > 2000 /*每2000条记录就刷新缓存,写到文件中 */

          THEN

            UTL_FILE.FFLUSH(V_EXP_FILE);

            V_FLUSH_OVER := 1;

          ELSE

            V_FLUSH_OVER := V_FLUSH_OVER + 1;

          END IF;

        END LOOP;

        DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);

        IF UTL_FILE.IS_OPEN(V_EXP_FILE)

        THEN

          UTL_FILE.FCLOSE(V_EXP_FILE);

          V_LINE := ‘load data infile “‘ || V_DATA_PATH || V_FILE_NAME || ‘” into table ‘ || P_TB_NAME;

          V_LINE := V_LINE || ‘ fields terminated by “,” enclosed by “^” lines terminated by “\r\n”;’;

          UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);

          UTL_FILE.FFLUSH(V_LOAD_FILE);

        END IF;

      END;

    EXCEPTION

      WHEN OTHERS THEN

        IF DBMS_SQL.IS_OPEN(V_SQL_ID)

        THEN

          DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);

        END IF;

        IF UTL_FILE.IS_OPEN(V_EXP_FILE)

        THEN

          UTL_FILE.FCLOSE(V_EXP_FILE);

        END IF;

        DBMS_OUTPUT.PUT_LINE(SQLERRM);

        DBMS_OUTPUT.PUT_LINE(P_SQL_STMT);

    END;

  END;

BEGIN

  BEGIN

    EXECUTE IMMEDIATE ‘create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) ‘;

  EXCEPTION

    WHEN OTHERS THEN

      NULL;

  END;

  EXECUTE IMMEDIATE ‘truncate table mysql_etl_tbs’;

  DECLARE

    V_CI    PLS_INTEGER;

    V_CN    VARCHAR2(40);

    V_ETL_COLS VARCHAR2(32767);

    V_TBN   VARCHAR2(30);

    V_ETL_CFG VARCHAR2(32767);

    V_CNF_FILE UTL_FILE.FILE_TYPE;

    V_FROM_POS PLS_INTEGER;

  BEGIN

    V_CNF_FILE := UTL_FILE.FOPEN(V_ETL_DIR, ‘ETL_TABS.CNF’, ‘r’, 32767);

    LOOP

      UTL_FILE.GET_LINE(V_CNF_FILE, V_ETL_CFG, 32767);

      V_FROM_POS := REGEXP_INSTR(V_ETL_CFG, ‘from’, 1, 1, 0, ‘i’);

      V_ETL_COLS := SUBSTR(V_ETL_CFG, 1, V_FROM_POS – 1);

      V_ETL_COLS := REGEXP_SUBSTR(V_ETL_COLS, ‘(select)(.+)’, 1, 1, ‘i’, 2);

      V_TBN   := REGEXP_SUBSTR(V_ETL_CFG, ‘(\s+from\s+)(\w+)(\s*)’, 1, 1, ‘i’, 2);

      V_TBN   := UPPER(V_TBN);

      V_TABS.EXTEND();

      V_TABS(V_TABS.LAST).TBN := V_TBN;

      V_TABS(V_TABS.LAST).WHR := REGEXP_SUBSTR(V_ETL_CFG, ‘\s+where .+’, 1, 1, ‘i’);

      V_CI := 1;

      LOOP

        V_CN := REGEXP_SUBSTR(V_ETL_COLS, ‘\S+’, 1, V_CI);

        EXIT WHEN V_CN IS NULL;

        V_CN := UPPER(V_CN);

        EXECUTE IMMEDIATE ‘insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)’

          USING V_TBN, V_CN, V_CI;

        COMMIT;

        V_CI := V_CI + 1;

      END LOOP;

    END LOOP;

  EXCEPTION

    WHEN UTL_FILE.INVALID_PATH THEN

      DBMS_OUTPUT.PUT_LINE(‘指定的目录:ETL_DIR”‘ || ‘”无效!’);

      RETURN;

    WHEN UTL_FILE.INVALID_FILENAME THEN

      DBMS_OUTPUT.PUT_LINE(‘指定的文件:” ETL_TABS.CNF’ || ‘”无效!’);

      RETURN;

    WHEN NO_DATA_FOUND THEN

      UTL_FILE.FCLOSE(V_CNF_FILE);

    WHEN OTHERS THEN

      DBMS_OUTPUT.PUT_LINE(SQLERRM);

      RETURN;

  END;

  DECLARE

    V_CUR_MATCH  SYS_REFCURSOR;

    V_SQL_SMT   VARCHAR2(32767);

    V_TN     VARCHAR2(40);

    V_CN     VARCHAR2(40);

    V_CI     PLS_INTEGER;

    V_COLUMN_NAME VARCHAR2(40);

    V_ETL_COLS  VARCHAR2(32767);

    V_LINE    VARCHAR2(4000);

    V_TBN     VARCHAR2(40);

  BEGIN

    V_LOAD_FILE := UTL_FILE.FOPEN(V_ETL_DIR, ‘load_data.sql’, OPEN_MODE => ‘w’, MAX_LINESIZE => 32767);

    FOR T_IX IN V_TABS.FIRST .. V_TABS.LAST

    LOOP

      V_SQL_SMT := ‘select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn=”:tbn:” ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci’;

      V_TBN   := V_TABS(T_IX).TBN;

      V_SQL_SMT := REPLACE(V_SQL_SMT, ‘:tbn:’, V_TBN);

      V_ETL_COLS := NULL;

      OPEN V_CUR_MATCH FOR V_SQL_SMT;

      LOOP

        FETCH V_CUR_MATCH

          INTO V_TN, V_CN, V_COLUMN_NAME, V_CI;

        EXIT WHEN V_CUR_MATCH%NOTFOUND;

        IF V_CI > 1

        THEN

          V_ETL_COLS := V_ETL_COLS || ‘ , ‘;

        END IF;

        IF V_COLUMN_NAME IS NULL

        THEN

          V_ETL_COLS := V_ETL_COLS || ‘ cast(null as number) ‘ || V_CN;

        ELSE

          V_ETL_COLS := V_ETL_COLS || V_CN;

        END IF;

      END LOOP;

      CLOSE V_CUR_MATCH;

      V_TBN   := LOWER(V_TBN);

      V_SQL_SMT := ‘select ‘ || V_ETL_COLS || ‘ from ‘ || V_TBN || V_TABS(T_IX).WHR;

      ETL_DATA(V_SQL_SMT, P_DATA_PATH, V_TBN);

    END LOOP;

    IF UTL_FILE.IS_OPEN(V_LOAD_FILE)

    THEN

      UTL_FILE.FCLOSE(V_LOAD_FILE);

    END IF;

  END;

END P_ETL_ORA_DATA;
总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对共生网络的支持。如果你想了解更多相关内容请查看下面相关链接

原创文章,作者:starterknow,如若转载,请注明出处:https://www.starterknow.com/115421.html

联系我们