Search

CN-121996918-A - Flow processing method for cleaning marketing data of agricultural products in real time

CN121996918ACN 121996918 ACN121996918 ACN 121996918ACN-121996918-A

Abstract

The invention provides a flow processing method for real-time cleaning of agricultural product marketing data, which belongs to the technical field of big data processing, and the invention adopts a minimum cut algorithm to decompose a dependency network into a weakly coupled sub-graph set and insert intermediate materialized nodes to realize decoupling by constructing a field dependency directed graph and quantifying the dependency strength with mutual information, performs operator fusion optimization of Hamiltonian path heuristic on each sub-graph to generate a super operator and converts the super operator into a machine code through a real-time compiler, establishes a multi-level cache affinity scheduling mechanism and a dynamic watermarking mechanism to process disordered data, adopts a two-stage dynamic repartition strategy and a work stealing queue to cope with data tilting, constructs a layered storage architecture and adjusts capacity allocation through a storage redundancy optimization function, thereby solving the technical problem of limited parallel processing capability caused by strong dependency relationship among fields in the real-time cleaning process of agricultural product marketing data.

Inventors

  • XU MIN
  • YU LI
  • HAN QI
  • ZHANG XUANYA

Assignees

  • 武汉交通职业学院

Dates

Publication Date
20260508
Application Date
20260122

Claims (10)

  1. 1. A flow processing method for real-time cleaning of agricultural product marketing data is characterized by comprising the steps of constructing a field dependence directed graph of an agricultural product marketing data flow, taking all fields in the data flow as nodes, taking cleaning rule dependence relations among the fields as directed edges, quantifying dependence strength by calculating mutual information among the fields and assigning the mutual information to be edge weight, executing a graph-theory minimum-cut-driven data dependence decoupling algorithm on the field dependence directed graph, adopting a minimum-cut algorithm to calculate an optimal segmentation scheme, decomposing the directed graph into a plurality of sub-graph sets with minimum dependence strength, inserting strong dependence rules corresponding to the cut edges into intermediate materialized nodes to realize dependence decoupling, executing an operator fusion optimization algorithm for Hamiltonian path heuristic to cleaning operator sequences in each sub-graph set, taking operators as vertexes to construct fusion candidate graphs, adopting dynamic programming to combine a pruning strategy to solve an optimal fusion path, generating a fused super operator and converting the fused super operator into a machine code through a real-time compiler.
  2. 2. The method for processing the agricultural product marketing data in a streaming manner according to claim 1, wherein the field dependence is calculated by mutual information in a directed graph, specifically, for the field X and the field Y, the joint probability distribution and the edge probability distribution are counted, the mutual information represents the correlation degree between the two fields, the larger the value is, the higher the dependence strength is, and the result is normalized to a range of 0 to 1 as the edge weight.
  3. 3. The method for processing the agricultural product marketing data in real time according to claim 2, wherein the executing process of the minimum segmentation algorithm is specifically to initialize all nodes into independent sets, iteratively execute node merging operation, select a node with the largest connection weight with the current set for each iteration to join, record the weight of the cut set of each iteration, and select the cut set with the smallest weight as the optimal segmentation scheme after subtracting 1 iteration from the number of nodes.
  4. 4. The method for processing agricultural product marketing data in real time according to claim 3, wherein the intermediate materialized node refers to a buffer node inserted in the data stream processing process and is used for temporarily storing the output result of the preamble cleaning operator, breaking the original direct dependency relationship between fields, so that the subsequent operator can read data from the intermediate materialized node instead of waiting for the completion of the preamble operator, and decoupling the dependency chain is realized.
  5. 5. The method for processing agricultural product marketing data in real time according to claim 4, wherein an operator fusion optimization algorithm for Hamiltonian path heuristic defines a fusion benefit function, the fusion benefit function comprehensively considers benefits of eliminating serialization overhead, improving cache locality and reducing function call overhead, and simultaneously restricts the fused code volume not to exceed the instruction cache capacity of the processor and optimizes the search space through state compression.
  6. 6. The method for processing agricultural product marketing data in real time according to claim 5, wherein the super operator is a composite operator formed by fusing a plurality of lightweight cleaning operators, and comprises filtering operation, mapping transformation operation and data verification operation, and the fused super operator is free from serialization and deserialization in internal data transmission, and is directly transmitted through a register or a first-level cache, so that the memory access times are reduced.
  7. 7. The method of claim 6, wherein the just-in-time compiler uses a runtime code generation technique to convert the fused super operator logic into machine code optimized for the current processor architecture, including vectorized instruction optimization, loop unroll optimization, and branch prediction optimization, the compiled machine code residing in an instruction cache for repeated calls.
  8. 8. The method of claim 7, further comprising creating a multi-level cache affinity scheduling mechanism after the super operator is generated, wherein the hardware performance counter monitors the data access pattern, binding the associated data stream to the same processor cache line, and distributing the data to the local memory node of the processor where the thread is located under a non-uniform memory access architecture.
  9. 9. The method for processing agricultural product marketing data in real time according to claim 8, further comprising setting a dynamic watermark mechanism based on quantile statistics after the multi-level cache affinity scheduling mechanism is established, determining a time sequence tolerance window according to 99 th percentile delay value of historical data delay distribution, and sending the data arriving in the time sequence tolerance window to a cleaning flow for processing after sequencing according to time stamps.
  10. 10. The method for processing agricultural product marketing data in real time according to claim 9, wherein the quantile statistical method of the dynamic watermarking mechanism is characterized in that data delay samples in a period of time are collected, the data delay samples are arranged in an ascending order according to delay time length, the 99 th percentile delay value represents that 99% of data delay is smaller than the 99 th percentile delay value, the 99 th percentile delay value is used as the upper bound of the time sequence tolerance window, and data exceeding the upper bound is judged to be late data.

Description

Flow processing method for cleaning marketing data of agricultural products in real time Technical Field The invention belongs to the technical field of big data processing, and particularly relates to a streaming processing method for cleaning agricultural product marketing data in real time. Background In the field of real-time cleaning of agricultural product marketing data, a traditional stream processing system generally adopts a data stream processing framework based on a directed acyclic graph, a data processing pipeline is constructed by defining the upstream and downstream dependency relationship of cleaning operators, and each cleaning operator sequentially executes filtering, checking and mapping transformation operations according to a topological ordering sequence, so that the quality of original marketing data is improved. In the current large-scale agricultural product e-commerce platform, marketing data flow comprises multi-dimensional fields such as commodity information, price fluctuation, inventory status, order records and the like, and complex cleaning rule dependency relations exist among the fields, for example, price rationality verification depends on commodity category judgment, and order validity check depends on inventory data verification. However, the traditional cleaning process regards all field dependencies as equivalence constraints, and adopts a global serialization or coarse-granularity packet parallel processing mode, so that even if only weak dependencies exist among partial fields, the dependency sequence must be strictly followed for execution, and a lengthy critical path blocking is formed. That is, the technical problem that the parallel processing capability is limited due to the strong dependency relationship among fields in the real-time cleaning process of the agricultural product marketing data exists in the prior art. Disclosure of Invention In view of the above, the invention provides a streaming processing method for real-time cleaning of agricultural product marketing data, which can solve the technical problem that the parallel processing capability is limited due to strong dependency relationship among fields in the real-time cleaning process of agricultural product marketing data in the prior art. The invention provides a flow processing method for real-time cleaning of agricultural product marketing data, which comprises the following steps of constructing a field dependence directed graph of the agricultural product marketing data flow, taking all fields in the data flow as nodes, taking cleaning rule dependence relations among the fields as directed edges, quantifying dependence strength by calculating mutual information among the fields and assigning the mutual information as edge weight, executing a graph theory minimum cut-driven data dependence decoupling algorithm on the field dependence directed graph, adopting a minimum cut algorithm to calculate an optimal segmentation scheme, decomposing the directed graph into a plurality of sub-graph sets with minimum dependence strength, inserting strong dependence rules corresponding to the cut edges into intermediate materialized nodes to realize dependence decoupling, executing an operator fusion optimization algorithm for Hamiltonian path heuristic on cleaning operator sequences in each sub-graph set, taking operators as vertexes to construct fusion candidate graphs, adopting dynamic planning to combine pruning strategies to solve the optimal fusion path, generating a fused super operator and converting the super operator into a machine code through a real-time compiler. The mutual information in the field dependency directed graph is calculated, specifically, for the field X and the field Y, the joint probability distribution and the edge probability distribution of the field X and the field Y are counted, the mutual information represents the correlation degree between two fields, the larger the numerical value is, the higher the dependency strength is, and the result is normalized to a range of 0 to 1 to be used as the edge weight. The executing process of the minimum cut algorithm specifically initializes all nodes into independent sets, iteratively executes node merging operation, selects nodes with the largest connecting weights with the current set for each iteration to be added, records the cut set weight of each iteration, and selects the cut set with the smallest weight as an optimal cutting scheme after 1 iteration of node number reduction. The intermediate materialized node refers to a cache node inserted in the data stream processing process and is used for temporarily storing the output result of the preamble cleaning operator, breaking the original direct dependency relationship among fields, enabling the subsequent operator to read data from the intermediate materialized node instead of waiting for the completion of the preamble operator, and realizing the decoupling of the dependency c