Search

CN-121210499-B - Method, device, equipment and medium for real-time intelligent analysis and processing of streaming data

CN121210499BCN 121210499 BCN121210499 BCN 121210499BCN-121210499-B

Abstract

The application relates to a method, a device, equipment and a medium for real-time intelligent analysis and processing of streaming data. The method comprises the steps of obtaining query requests of streaming data, analyzing logic structures of the query requests to obtain corresponding query grammar trees, reading metadata of a unified storage cluster based on the query grammar trees to generate physical execution paths, estimating cost of each physical execution path based on preset cost rules, selecting the physical execution path with the minimum cost as a physical execution plan, executing the physical execution plan, obtaining a target data set from the unified storage cluster, analyzing the target data set through a preset machine learning analysis model to obtain a deep analysis result set, and generating a final analysis report corresponding to the streaming data based on the deep analysis result set. The method can improve the utilization rate of resources and simultaneously consider real-time data and historical data for analysis.

Inventors

  • TENG ZIWEI

Assignees

  • 三峡大学

Dates

Publication Date
20260512
Application Date
20250915

Claims (7)

  1. 1. The method for intelligent real-time analysis and processing of stream data is characterized by comprising the following steps: Acquiring a query request of streaming data, and analyzing a logic structure of the query request to obtain a corresponding query grammar tree; based on the query grammar tree, reading metadata of a unified storage cluster to generate a physical execution path, wherein the unified storage cluster is a data storage architecture which is updated in real time and is used for storing the streaming data; Estimating the cost of each physical execution path based on a preset cost rule, and selecting the physical execution path with the minimum cost as a physical execution plan; Executing the physical execution plan, acquiring a target data set from the unified storage cluster, and analyzing the target data set through a preset machine learning analysis model to obtain a depth analysis result set, wherein the machine learning analysis model comprises at least one of an anomaly detection model, a classification prediction model, a time sequence prediction model and a clustering model; Generating a final analysis report corresponding to the streaming data based on the depth analysis result set; The analyzing the logic structure of the query request to obtain a corresponding query syntax tree includes: Adding a predefined type tag to each character of the query request to obtain a lexical unit stream, wherein the predefined type tag comprises at least one of a keyword, an identifier, a constant, an operator and a separator; Processing the lexical unit stream by a move-in reduction algorithm based on a predefined context-free grammar to obtain a specific grammar tree; Recombining the logic expression of the concrete grammar tree to generate an abstract grammar tree; Performing context-related check on the abstract syntax tree to obtain the query syntax tree, wherein the context-related check comprises one of whether an identifier exists, whether a data type is compatible and whether a scope is valid; the reading metadata of the unified storage cluster based on the query syntax tree to generate a physical execution path includes: based on the metadata of the unified storage cluster, carrying out semantic analysis on the query grammar tree to obtain a logic query plan; enumerating all feasible physical operator implementation schemes for each logical operator in the logical query plan based on the metadata, and generating a candidate physical execution path set; adding relevant statistical data to the candidate physical execution path set to obtain the physical execution path; wherein the estimating the cost of each physical execution path based on the preset cost rule includes: Determining the execution dependency relationship of the physical operators in the physical execution path to obtain a path sequence set; Calculating cost factors of the physical operators in the path sequence set based on the preset cost rule to obtain a cost path sequence set; Based on the cost path sequence set, calculating the cost of the physical execution path through the following formula: ; Wherein, the For the cost of the physical execution path, In order to calculate the total time consumption of the operation, The total time consumed for data reading and writing, Total time consumption for data network transmissions.
  2. 2. The method of claim 1, wherein the unified storage cluster is constructed by: Acquiring real-time incremental data of stream data and historical data of the stream data, and carrying out standardized processing on the real-time incremental data and the historical data to obtain a standardized event stream; dividing the corresponding data into aging intervals of the corresponding time stamps based on the time stamps of the data in the standardized event stream to obtain an aging tag set; Mapping the access frequency of the data to a corresponding heat label based on a preset grading rule to obtain a heat label set; generating a comprehensive tag data set based on the aging tag set and the heat tag set; and writing the standardized event stream into a corresponding data storage layer based on the comprehensive label data set to obtain a unified storage cluster.
  3. 3. The method according to claim 2, wherein writing the normalized event stream to a corresponding data storage layer based on the integrated tag data set, to obtain a unified storage cluster, comprises: matching the comprehensive tag data set with a preset combination rule to obtain a data storage address; Based on the data storage address, storing the data in the standardized event stream into different data storage layers to obtain a layered storage cluster, wherein the data storage layers comprise at least one of a real-time layer, a near-line layer, a temperature storage layer and a cold storage layer; based on a migration policy engine, migrating the data to different data storage layers to obtain an adaptive storage cluster; Based on the self-adaptive storage cluster, the distribution, index type and statistical information of the data of each layer are automatically extracted, and a global metadata catalog is obtained; And integrating the global metadata catalogue and the adaptive storage cluster to obtain the unified storage cluster.
  4. 4. The method of claim 2, wherein normalizing the real-time delta data and the historical data to obtain a normalized event stream comprises: carrying out structural analysis processing on the real-time incremental data and the historical data to obtain a key value pair set; converting the key value pair set into a unified unit through a preset lightweight decision tree model to obtain a normalized data set, and performing time reference alignment on the normalized data set to obtain a time aligned event sequence; based on the time-aligned sequence of events, the real-time access frequency of the data is calculated by the following formula: ; Wherein, the For the real-time access frequency of data k in the current window, T is the sliding window size, As the current time stamp is to be used, For the event i to be an event i, In order for the data to be of interest, Time for events to be processed; and adding the real-time access frequency in the time-aligned event sequence to obtain the standardized event stream.
  5. 5. A device for intelligent real-time analysis and processing of streaming data, the device comprising: the query module is used for acquiring a query request of streaming data and analyzing a logic structure of the query request to obtain a corresponding query grammar tree; the path module is used for reading metadata of a unified storage cluster based on the query grammar tree and generating a physical execution path, wherein the unified storage cluster is a data storage architecture which is updated in real time and is used for storing the streaming data; The cost module is used for estimating the cost of each physical execution path based on a preset cost rule and selecting the physical execution path with the minimum cost as a physical execution plan; The execution module is used for executing the physical execution plan, acquiring a target data set from the unified storage cluster, and analyzing the target data set through a preset machine learning analysis model to obtain a depth analysis result set, wherein the machine learning analysis model comprises at least one of an anomaly detection model, a classification prediction model, a time sequence prediction model and a clustering model; the analysis module is used for generating a final analysis report corresponding to the streaming data based on the depth analysis result set; wherein, the inquiry module is further used for: Adding a predefined type tag to each character of the query request to obtain a lexical unit stream, wherein the predefined type tag comprises at least one of a keyword, an identifier, a constant, an operator and a separator; Processing the lexical unit stream by a move-in reduction algorithm based on a predefined context-free grammar to obtain a specific grammar tree; Recombining the logic expression of the concrete grammar tree to generate an abstract grammar tree; Performing context-related check on the abstract syntax tree to obtain the query syntax tree, wherein the context-related check comprises one of whether an identifier exists, whether a data type is compatible and whether a scope is valid; wherein, the path module is further configured to: based on the metadata of the unified storage cluster, carrying out semantic analysis on the query grammar tree to obtain a logic query plan; enumerating all feasible physical operator implementation schemes for each logical operator in the logical query plan based on the metadata, and generating a candidate physical execution path set; adding relevant statistical data to the candidate physical execution path set to obtain the physical execution path; wherein, the cost module is further configured to: Determining the execution dependency relationship of the physical operators in the physical execution path to obtain a path sequence set; Calculating cost factors of the physical operators in the path sequence set based on the preset cost rule to obtain a cost path sequence set; Based on the cost path sequence set, calculating the cost of the physical execution path through the following formula: ; Wherein, the For the cost of the physical execution path, In order to calculate the total time consumption of the operation, The total time consumed for data reading and writing, Total time consumption for data network transmissions.
  6. 6. A computer device comprising a memory and a processor, the memory storing a computer program, characterized in that the processor implements the steps of the method of any of claims 1 to 4 when the computer program is executed.
  7. 7. A computer readable storage medium, on which a computer program is stored, characterized in that the computer program, when being executed by a processor, implements the steps of the method of any of claims 1 to 4.

Description

Method, device, equipment and medium for real-time intelligent analysis and processing of streaming data Technical Field The invention belongs to the field of real-time information processing, and particularly relates to a method, a device, equipment and a medium for real-time intelligent analysis and processing of streaming data. Background With the rapid development of big data and real-time computing technology, a streaming data real-time intelligent analysis processing technology is generated, and the technology has the characteristics of low delay, high throughput and expandability, can perform instant processing and analysis on continuously generated data streams, and further leads out complex real-time data and historical data association analysis requirements. In the conventional technology, a mixed architecture of batch processing and stream processing is generally adopted for processing, a batch processing layer is used for processing the whole amount of historical data, a speed layer is used for processing real-time increment data, and the results are combined at a service layer to respond to inquiry, or a single stream processing layer is used for processing all data, but the replay processing of the historical data is still relied on. The traditional architecture at present has the problems that the data consistency is difficult to guarantee, the resource utilization rate is low, and the real-time and historical data association analysis capability is insufficient. Disclosure of Invention Accordingly, in order to solve the above-mentioned problems, it is necessary to provide a method, apparatus, device and medium for real-time intelligent analysis and processing of streaming data, which can improve the resource utilization and correlate and analyze real-time data and history data. In a first aspect, the present application provides a method for real-time intelligent analysis and processing of streaming data, including: Acquiring a query request of streaming data, and analyzing a logic structure of the query request to obtain a corresponding query grammar tree; Based on the query grammar tree, reading metadata of a unified storage cluster to generate a physical execution path, wherein the unified storage cluster is a data storage architecture which is updated in real time and is used for storing streaming data; Estimating the cost of each physical execution path based on a preset cost rule, and selecting the physical execution path with the minimum cost as a physical execution plan; Executing a physical execution plan, acquiring a target data set from the unified storage cluster, and analyzing the target data set through a preset machine learning analysis model to obtain a depth analysis result set, wherein the machine learning analysis model comprises at least one of an anomaly detection model, a classification prediction model, a time sequence prediction model and a clustering model; based on the depth analysis result set, a final analysis report corresponding to the streaming data is generated. Further, the unified storage cluster is constructed by the following method: acquiring real-time incremental data of stream data and historical data of the stream data, and carrying out standardization processing on the original data and the historical data to obtain a standardized event stream; Based on the time stamp of the data in the standardized event stream, dividing the corresponding data into an aging interval of the corresponding time stamp to obtain an aging label set; Mapping the access frequency of the data to the corresponding heat label based on a preset grading rule to obtain a heat label set; Generating a comprehensive tag data set based on the aging tag set and the heat tag set; And writing the standardized event stream into a corresponding data storage layer based on the comprehensive label data set to obtain a unified storage cluster. Further, writing the standardized event stream into a corresponding data storage layer based on the comprehensive label data set to obtain a unified storage cluster, including: matching the comprehensive tag data set with a preset combination rule to obtain a data storage address; Based on the data storage address, storing the data in the standardized event stream into different data storage layers to obtain a layered storage cluster, wherein the data storage layers comprise at least one of a real-time layer, a near-line layer, a temperature storage layer and a cold storage layer; Based on a migration policy engine, migrating data to different data storage layers to obtain a self-adaptive storage cluster; Based on the self-adaptive storage cluster, the distribution, index type and statistical information of each layer of data are automatically extracted, and a global metadata catalog is obtained; and integrating the global metadata catalogue and the adaptive storage cluster to obtain a unified storage cluster. Further, the normalizing processing is perf