CN-122019517-A - Event processing method, device, computer equipment and storage medium
Abstract
The application discloses an event processing method, an event processing device, computer equipment and a storage medium, wherein the method comprises the steps of obtaining original events of all systems, converting the original events into standardized event data, and storing the standardized event data into a message queue and a database; the method comprises the steps of reading standardized event data stored in a message queue, processing the standardized event data according to a preset rule to obtain at least one target event set, generating a corresponding standard event based on the at least one target event set, responding to a trigger preset condition, obtaining the standardized event data stored in a database, judging whether the standardized event data are consistent with the corresponding target event set or not to obtain a judging result, responding to the fact that the standardized event data are inconsistent with the corresponding target event set, generating a new target event set based on the standardized event data stored in the database, and carrying out real-time standardization, association and sequence processing on multi-source heterogeneous events under low time delay and supporting accurate intervention and consistency verification.
Inventors
- ZHANG YUAN
- YU CHUNZHI
- YANG JIE
- ZHAO YUMENG
- XIN LONG
- XIAO BING
- JIANG SHAOYU
- WANG SHUTING
Assignees
- 华夏基金管理有限公司
Dates
- Publication Date
- 20260512
- Application Date
- 20251229
Claims (10)
- 1. An event processing method, comprising: Acquiring original events of each system, converting the original events into standardized event data, and storing the standardized event data into a message queue and a database; Reading the standardized event data stored in the message queue, processing the standardized event data according to a preset rule to obtain at least one target event set, and generating a corresponding standard event based on the at least one target event set; Responding to a triggering preset condition, acquiring the standardized event data stored in the database, and judging whether the standardized event data is consistent with a corresponding target event set or not to obtain a judging result; And if the result is inconsistent, generating a new target event set based on the standardized event data stored in the database, generating a corresponding new standard event based on the new target event set, and covering the standard event.
- 2. The method for processing an event according to claim 1, wherein the reading the standardized event data stored in the message queue, processing the standardized event data according to a preset rule, obtaining at least one target event set, and generating a corresponding standard event based on the at least one target event set, includes: classifying standardized event data with the same instruction identification and the same date into the same instruction event set; in any instruction event set, sorting the standardized event data according to event time stamps in each standardized event data of an event processing method, and dividing the sorted standardized event data into a plurality of processing windows according to a preset window strategy; screening the standardized event data according to event types and business rules in each processing window, deleting invalid events and retaining valid events; updating an intermediate processing state corresponding to the set of instruction events based on the valid events within each of the processing windows; generating a target event set corresponding to the instruction event set according to the updated intermediate processing state and the effective events in each processing window; and constructing a standard event structure body based on the target event set, and generating standard events according to a unified field format.
- 3. The method for processing an event according to claim 2, wherein in any one of the instruction event sets, the normalized event data is ordered according to an event timestamp in each normalized event data of the event processing method, and the ordered normalized event data is divided into a plurality of processing windows according to a preset window policy, including: Responding to the standardized event data in the instruction event set, extracting event time stamps corresponding to the standardized event data, and sequencing the standardized event data according to the sequence of the event time stamps; Determining a window boundary based on preset window parameters, wherein the window parameters at least comprise a window step length and a window duration; dividing the sequenced standardized event data into a plurality of processing windows according to the window boundaries, enabling the standardized event data with the event time stamps falling into the same window boundary range to be classified into the same processing window, and generating sliding between adjacent processing windows according to the window step length.
- 4. The method of claim 2, wherein updating the intermediate processing state corresponding to the set of instruction events based on the valid events within each of the processing windows comprises: In response to detecting that the instruction event set is processed for the first time, initializing an intermediate processing state corresponding to the instruction event set according to a preset initial value, wherein the intermediate processing state comprises an accumulated number field, an accumulated amount field and a service state field; Sequentially reading a quantity field, a price field and an event type field in the effective event in each processing window according to the sequence of the timestamp fields of the effective event; Updating the accumulated number field and the accumulated amount field based on the number field and the price field, and updating a business state field in the intermediate processing state according to a preset state updating rule; And in response to completion of processing of the valid event within the current processing window, storing the updated intermediate processing state and serving as an initial state when a state update is performed for a next processing window.
- 5. The event processing method according to claim 1, wherein the event processing method is executed as a stream processing job, further comprising: Respectively establishing mutually independent operation environments for different stream processing operations; Creating and storing an operation environment instance corresponding to the affiliated operation through a static single instance class in each operation environment; When the stream processing operators are initialized, the running environment examples of the affiliated jobs are obtained through the static single instance class, so that different stream processing operators in the same job share the running environment examples of the affiliated jobs, the running environment examples of each other are not accessed among different jobs, and the corresponding running environment examples are released when the respective jobs are finished.
- 6. The method for processing an event according to claim 1, wherein said obtaining the normalized event data stored in the database in response to triggering a preset condition, and determining whether the normalized event data is consistent with a corresponding target event set, and obtaining a determination result, includes: in response to detecting that the preset condition is triggered, obtaining standardized event data to be verified from the database, and obtaining a target event set corresponding to the standardized event data to be verified; Correspondingly matching the standardized event data to be checked with the target event set according to the event type and a preset event key field set, and judging whether the number of the events is consistent and whether the field value and the field structure of the corresponding events meet the preset consistency requirement based on a matching result; and in response to the event quantity being consistent and the corresponding event passing the consistency check, determining the judging result as consistent, otherwise, determining the judging result as inconsistent.
- 7. The method of event processing according to claim 1, wherein prior to storing the normalized event data in a message queue and database, further comprising: generating idempotent identifications corresponding to the original event based on the business key fields in the original event; performing existence check on the idempotent identifications, deleting the corresponding original event and terminating the processing flow in response to the existence of the idempotent identifications in the idempotent check table; And in response to the idempotent verification table not having the idempotent identifier, writing the idempotent identifier and the corresponding processing state into the idempotent verification table in an atomic operation mode, and continuing to execute the writing operation.
- 8. An event processing apparatus, the apparatus comprising: The event acquisition module is used for acquiring original events of all systems, converting the original events into standardized event data and storing the standardized event data into a message queue and a database; The event processing module is used for reading the standardized event data stored in the message queue, processing the standardized event data according to a preset rule to obtain at least one target event set, and generating a corresponding standard event based on the at least one target event set; the event verification module is used for responding to a triggering preset condition, acquiring the standardized event data stored in the database, judging whether the standardized event data is consistent with a corresponding target event set or not, and obtaining a judging result; And the execution module is used for responding to the inconsistent result, generating a new target event set based on the standardized event data stored in the database, generating a corresponding new standard event based on the new target event set and covering the standard event.
- 9. A computer device comprising a memory, a processor and a computer program stored on the memory and executable on the processor, characterized in that the processor implements the steps of the method according to any one of claims 1 to 7 when the computer program is executed by the processor.
- 10. A computer readable storage medium, on which a computer program is stored, characterized in that the computer program, when being executed by a processor, implements the steps of the method of any of claims 1 to 7.
Description
Event processing method, device, computer equipment and storage medium Technical Field The present application relates to the field of information processing technologies, and in particular, to an event processing method, an event processing device, a computer device, and a storage medium. Background In an event processing scenario, messages from different channels and systems (dealer counters, houses, quotation and instruction systems, etc.) together form a complete business chain from ordering, delegation, bargaining to clearing. The messages have heterogeneous formats, obvious semantic differences and are highly sensitive to event timing and incidence relations. The platform needs to complete cross-source standardization, association and de-duplication under low time delay, can trace back and audit, and stably outputs standard events and account results for real-time wind control, position monitoring and compliance inspection. Two general realization paths are batch processing, namely, the messages in a certain time window are intensively cleaned, standardized and summarized to generate standard instructions and transaction events through timing operation, the integrity and accuracy of the results are ensured by looking back the whole data, and the flow processing is realized, the real-time access, standardization, grouping sequencing and window calculation are realized by depending on a message queue and a flow engine, and the service entity attribute is complemented in a link, so that the results are quickly dropped into a database and played back. However, the scheme has the problems that a code system is difficult to multiplex, the accurate intervention capability is insufficient, the attribute completion overhead is high, the disorder processing efficiency is limited, and consistency and throughput are difficult to consider. Disclosure of Invention The application provides an event processing method, an event processing device, computer equipment and a storage medium, which can perform real-time standardization, association and order processing on multi-source heterogeneous events under low time delay. In one aspect, the present application provides an event processing method, including: Acquiring original events of each system, converting the original events into standardized event data, and storing the standardized event data into a message queue and a database; reading standardized event data stored in a message queue, processing the standardized event data according to a preset rule to obtain at least one target event set, and generating a corresponding standard event based on the at least one target event set; responding to a triggering preset condition, acquiring standardized event data stored in a database, and judging whether the standardized event data is consistent with a corresponding target event set or not to obtain a judging result; And if the result is inconsistent, generating a new target event set based on the standardized event data stored in the database, generating a corresponding new standard event based on the new target event set, and covering the standard event. In another aspect, the present application provides a message processing apparatus, the apparatus comprising: the event acquisition module is used for acquiring original events of all the systems, converting the original events into standardized event data and storing the standardized event data into the message queue and the database; The event processing module is used for reading the standardized event data stored in the message queue, processing the standardized event data according to a preset rule to obtain at least one target event set, and generating a corresponding standard event based on the at least one target event set; The event verification module is used for responding to a triggering preset condition, acquiring standardized event data stored in the database, judging whether the standardized event data is consistent with a corresponding target event set or not, and obtaining a judging result; And the execution module is used for responding to the inconsistent result, generating a new target event set based on the standardized event data stored in the database, generating a corresponding new standard event based on the new target event set and covering the standard event. In yet another aspect, the present application provides a computer readable storage medium having stored thereon a computer program which, when executed by a processor, implements the event processing method of the first aspect. In yet another aspect, the present application provides a computer device including a memory, a processor, and a computer program stored on the memory and executable on the processor, the processor implementing the event processing method of the first aspect when executing the computer program. The method comprises the steps of accessing original events of all systems through a system, completing field and time