CN-122019473-A - Iceberg table writing method and system based on DataX
Abstract
The application discloses a Iceberg table writing method based on DataX, which aims to solve the problems that DataX lacks Iceberg writing plug-in, is low in efficiency, is poor in consistency, does not support partition and the like. The method comprises the steps of initializing Iceberg connection, analyzing metadata and creating a catalyst and a table object, converting a dataX Record into GenericRecord according to Iceberg Schema, collecting data, performing batch writing through FILEAPPENDER to generate DataFile, guaranteeing data consistency by using Iceberg atomic commit mechanism, performing partition writing control, and finally cleaning resources. According to the method, I/O and small files are reduced through batch writing, the consistency of the ACID is guaranteed through atomic submission, metadata management is optimized, flexible partition and data type conversion are supported, and writing efficiency and integration convenience are remarkably improved.
Inventors
- PENG ZHUANG
Assignees
- 中电云计算技术有限公司
Dates
- Publication Date
- 20260512
- Application Date
- 20260212
Claims (10)
- 1. A DataX-based Iceberg table writing method, the method comprising: S1, initializing Iceberg connection, namely analyzing metadata information of a target Iceberg table according to the data X operation configuration, and initializing a catalyst instance of Iceberg and a target Iceberg table object; S2, converting the field types in the Record object of the dataX into the data types supported by Iceberg according to the Schema constraint of the object Iceberg table; s3, batch data collection and writing, namely receiving data from a Reader through RecordReceiver of a dataX Writer and caching the data in a memory buffer area, converting a cached Record object into GenericRecord when the preset batch size or the end of a data stream is reached, writing the Record object in a batch mode by FILEAPPENDER of Iceberg to generate DataFile metadata comprising file paths, sizes and Record numbers, and atomically submitting the DataFile metadata to a target Iceberg table through NEWAPPEND (). APPENDFILE (DATAFILE). Commit (); S4, partition write control, namely specifying a target write partition in the data X job configuration, associating partition information when DataFile metadata is generated, and automatically processing partition update by Iceberg when submitting; s5, resource cleaning, namely closing FILEAPPENDER in the destruction stage of the DataX Writer, and releasing Iceberg connection and related resources.
- 2. The method of claim 1, wherein in step S1, the metadata information includes a database name, a table name, warehouse paths, and partition information.
- 3. The method of claim 1, wherein in step S1, the initializing Iceberg a connection further comprises: the method comprises the steps of obtaining Schema information and historical snapshot information of a Iceberg table through a Hive catalyst connection Hive Metastore; and positioning the data storage position on the HDFS according to the Warehouse path, and establishing connection with the distributed file system.
- 4. The method according to claim 1, wherein in step S2, the data type conversion specifically includes: S21, establishing a mapping relation between a data type of the dataX original data and a data type Iceberg, and mapping a BaseObject derivative field type of the dataX to STRINGTYPE, INTEGERTYPE, LONGTYPE, DOUBLETYPE, BOOLEANTYPE, DATETYPE, TIMESTAMPTYPE or BinaryType of Iceberg; s22, processing a null value, wherein if the DataX value is null and the Iceberg field can be empty, the value is set to be null; s23, capturing type conversion errors and recording the type conversion errors to TaskCollector of the DataX.
- 5. The method according to claim 1, wherein in step S3, the batch data collection and writing specifically includes: S31, in the writing stage of the DataX Writer, receiving data from the Reader through RecordReceiver and caching the data in a memory buffer area, and executing batch writing operation when the cached data volume reaches a preset batch size or the data flow is finished: s32, converting the cached DataX Record object into a GenericRecord format of Iceberg; S33, calling FILEAPPENDER of Iceberg to create a file writer in a batch mode, and adding GenericRecord objects to FILEAPPENDER in batches for writing; s34, after the writing is completed, dataFile metadata comprising file paths, file sizes and record numbers are generated, and DataFile is added to the target Iceberg table through an atomicity commit mechanism of Iceberg.
- 6. The method according to claim 1, characterized in that in step S3: the preset batch size is set through batchSize parameters of the DataX operation configuration; The FILEAPPENDER is created by adopting a part.write mode, and GenericRecord objects are added at one time through an app.addall (records); the DataFile metadata is generated by FILEAPPENDER, including file path, file size, and record number.
- 7. The method according to claim 1, characterized in that in step S3, the atomic commit is effected by means of a commit () operation of Iceberg, ensuring that all written data files are added or not validated at all successfully.
- 8. The method according to claim 1, wherein in step S4, the partition writing control specifically includes: s41, analyzing a partition parameter in the DataX operation configuration, and extracting a partition field and a partition value; S42, calling withPartitionPath a method to explicitly specify a partition path of the Hive style, wherein the format is key=value; and S43, automatically updating partition metadata by Iceberg during submitting operation, and manually maintaining a partition directory structure is not required.
- 9. A DataX-based Iceberg table writing system, wherein the system, when running, implements the steps of the DataX-based Iceberg table writing method of any one of claims 1-8, the system comprising: The connection initialization module is used for initializing the catalyst instance of Iceberg and the target Iceberg table object according to metadata information of the target Iceberg table according to the data X operation configuration analysis; The data type conversion module is used for converting the field types in the Record object of the DataX into data types supported by Iceberg according to the Schema constraint of the target Iceberg table; the batch data processing module is used for receiving data from a Reader through RecordReceiver of a DataX Writer and caching the data in a memory buffer area, converting a cached Record object into GenericRecord when a preset batch size or data stream is finished, writing the Record object in a batch mode by FILEAPPENDER of Iceberg to generate DataFile metadata comprising a file path, a file size and a Record number, and atomically submitting the DataFile metadata to a target Iceberg table through NEWAPPEND (). APPENDFILE (DATAFILE). Commit (); the partition control module is used for designating a target write partition in the DataX job configuration, associating partition information when DataFile metadata is generated, and automatically processing partition update by Iceberg when submitting; And the resource cleaning module is used for closing FILEAPPENDER and releasing Iceberg the connection and related resources in the destruction stage of the DataX Writer.
- 10. An electronic device is characterized by comprising a memory and a processor; A memory for storing a computer program; A processor for executing the computer program to implement the steps of the DataX-based Iceberg table writing method as defined in any one of claims 1 to 8.
Description
Iceberg table writing method and system based on DataX Technical Field The application relates to the technical field of data synchronization, in particular to a Iceberg table writing method, a system, a computer readable storage medium and electronic equipment based on DataX. Background With the development of big Data technology, data Lake (Data Lake) has become a core architecture for enterprises to store and manage massive Data. Data lakes typically store data in an open format (e.g., part, ORC) and implement metadata management via a data Catalog (Catalog). Apache Iceberg is used as a new generation data lake table format, and is widely applied in the field of data lakes by virtue of the characteristics of ACID transaction support, schema evolution, hidden partition and the like. Iceberg integration with Hive is one of the typical deployment modes of current data lake architecture. In this mode, the data files of Iceberg tables are stored in a distributed file system (e.g., hadoop HDFS) in a Parquet or ORC format, and the metadata are managed by means of Hive catalyst, i.e., iceberg tables are registered in Hive Metastore, so that Hive SQL can directly query and manage Iceberg tables, and vice versa. The integration not only utilizes the SQL query capability of Hive, but also retains the powerful data management characteristics of Iceberg (such as atomic transaction, version backtracking and Schema evolution). Specifically, the data files of Iceberg tables in the HDFS are organized according to a specified path, and Hive Metastore stores metadata such as table names, schema, partition information, and a data file list (pointed to by a management file). In the field of data integration, dataX is used as a heterogeneous data source offline synchronization tool of an Aribab open source and is widely applied to data migration and synchronization among different data sources. While DataX supports integration with Hive et al data warehouse (Hive bottom layer often employs Parquet et al columnar storage), it has not provided an off-the-shelf, mature write plug-in for the increasingly popular Iceberg table format. The existing solution is mostly dependent on custom development or an indirect mode to realize Iceberg writing, and has the problems of low efficiency and difficult guarantee of data integrity and consistency. The method is characterized in that a large number of I/O operations and frequent metadata updating are caused by directly writing Iceberg tables one by one, so that small files are increased, the writing efficiency is seriously reduced, the subsequent query performance is influenced, and meanwhile, the existing method also lacks effective support for the writing requirement of a specific partition. Disclosure of Invention In order to overcome the above-mentioned drawbacks of the prior art, the present application provides a new data x-based Iceberg table writing method and system. The application aims to solve the technical problems that the DataX in the prior art lacks a mature write-in plug-in unit aiming at Iceberg tables, the existing write-in mode is low in efficiency, the consistency of data is difficult to ensure, the partition write-in is not supported, and the like. In order to achieve the above object, the present invention adopts the following technical strategies: (1) Iceberg API batch write optimization, namely, the batch data processing capacity of DataX is combined with the batch write mechanism (FileAppender.addall) depth of Iceberg API for the first time, so that the efficient and atomic batch write oriented to the Iceberg table is realized. (2) The Iceberg writing plug-in framework facing the DataX constructs an extensible Iceberg writing plug-in which accords with the DataX plug-in specification, so that the data is synchronized to a Iceberg table by the original support of the DataX, and the problem of Iceberg writing capability deficiency in the existing DataX ecology is solved. (3) Integrating data type conversion and partition write-in control, namely integrating an intelligent data type conversion mechanism and flexible partition write-in control in the process of writing data into Iceberg, and meeting the requirements of a big data integration scene on data accuracy and management flexibility. (4) And the depth optimization I/O and metadata management is realized by finely calling Iceberg batch write APIs, so that I/O pressure and metadata operation of a data X write-in process on an underlying storage system (such as HDFS) are reduced, and high-performance and low-cost data lake write-in is realized. Specifically, the application provides the following technical scheme: A first aspect of the present application provides a data x-based Iceberg table writing method, as shown in fig. 1, the method includes: S1, initializing Iceberg connection, namely analyzing metadata information of a target Iceberg table according to the data X operation configuration, and initializing