Search

CN-121996365-A - Data processing method and related device

CN121996365ACN 121996365 ACN121996365 ACN 121996365ACN-121996365-A

Abstract

The application discloses a data processing method and a related device, wherein a first backlog data amount of a message queue at a first time is obtained, a second backlog data amount of the message queue in a first time period is obtained, the first time period is a time period before the first time period, whether the message queue meets a thread pool adjusting condition is determined based on the first backlog data amount and the second backlog data amount, the thread number of a current thread pool of the message queue is adjusted under the condition that the message queue meets the thread pool adjusting condition, the first thread pool of the message queue is obtained, and data in the message queue is processed based on the first thread pool. The application can track the running state of the message queue in real time, thereby accurately determining the adjustment time, further accurately and dynamically adjusting the thread pool of the message queue and avoiding the occurrence of abnormality of the message queue.

Inventors

  • DENG CHENGDONG

Assignees

  • 马上消费金融股份有限公司

Dates

Publication Date
20260508
Application Date
20241105

Claims (13)

  1. 1. A data processing method, characterized in that the data processing method comprises: acquiring a first backlog data amount of a message queue at a first time, and acquiring a second backlog data amount of the message queue in a first time period, wherein the first time period is a time period before the first time; determining whether the message queue meets a thread pool adjustment condition based on the first backlog data amount and the second backlog data amount; under the condition that the message queue meets the thread pool adjustment condition, adjusting the thread number of the current thread pool of the message queue to obtain a first thread pool of the message queue; Processing data in the message queue based on the first thread pool.
  2. 2. The data processing method of claim 1, wherein the obtaining the second backlog data amount of the message queue for the first period of time comprises: acquiring a third backlog data amount of the message queue at a plurality of second times in the first period; A second backlog data amount is determined based on the third backlog data amount for the plurality of second times.
  3. 3. The data processing method of claim 1, wherein the method further comprises: determining a plurality of second time periods located before the first time; determining, for each of the second time periods, difference information of the second time period from the first time; a first period of time is determined from the plurality of second periods of time based on difference information of each of the second periods of time from the first time.
  4. 4. A data processing method according to claim 3, wherein for each second period, the difference information of the second period and the first time includes backlog data difference information and a time interval, the backlog data difference information including the same type of backlog data or different types of backlog data; The determining a first period from a plurality of the second periods based on difference information of each of the second periods and the first time includes: Determining a second period with the same backlog data type as a third period in the second periods; and determining a third time period, of the third time periods, the time interval of which meets a preset time condition, as the first time period.
  5. 5. The data processing method of claim 1, wherein the determining whether the message queue satisfies a thread pool adjustment condition based on the first backlog data amount and the second backlog data amount comprises: Acquiring a data volume difference value and a data volume ratio between the first backlog data volume and the second backlog data volume; and determining whether the message queue meets a thread pool adjusting condition based on the magnitude relation between the data volume difference value and the first preset difference value and the second preset difference value and the magnitude relation between the data volume ratio value and the first preset ratio value and the second preset ratio value.
  6. 6. The method according to claim 5, wherein determining whether the message queue satisfies a thread pool adjustment condition based on a magnitude relation between the data amount difference and the first and second preset differences and a magnitude relation between the data amount ratio and the first and second preset ratios, comprises: When the data volume difference value is larger than a first preset difference value and the data volume ratio is larger than a first preset ratio value, or the data volume difference value is smaller than or equal to a second preset difference value and the data volume ratio is smaller than or equal to a second preset ratio value, determining that the message queue meets a thread pool adjustment condition; The first preset difference value is larger than the second preset difference value, and the first preset ratio difference value is larger than the second preset ratio value.
  7. 7. The method of claim 1, wherein said adjusting the number of threads of the current thread pool of the message queue to obtain the first thread pool of the message queue comprises: Determining a thread pool adjustment mode of the message queue based on the first backlog data amount and the second backlog data amount; and adjusting the thread quantity of the current thread pool of the message queue based on the thread pool adjusting mode and the preset thread adjusting quantity to obtain the first thread pool of the message queue.
  8. 8. The method of claim 7, wherein determining a thread pool adjustment of the message queue based on the first backlog data amount and the second backlog data amount comprises: When the data volume difference between the first backlog data volume and the second backlog data volume is larger than a first preset difference value and the data volume ratio between the first backlog data volume and the second backlog data volume is larger than a first preset ratio, determining that the thread pool adjustment mode is a capacity expansion mode; and when the data volume difference value is smaller than or equal to a second preset difference value and the data volume ratio is smaller than or equal to a second preset ratio, determining that the thread pool adjustment mode is a capacity reduction mode.
  9. 9. The method of claim 8, wherein determining that the thread pool adjustment mode is a capacity expansion mode when a data amount difference between the first backlog data amount and the second backlog data amount is greater than a first preset difference and a data amount ratio between the first backlog data amount and the second backlog data amount is greater than a first preset ratio comprises: when the data volume difference between the first backlog data volume and the second backlog data volume is larger than a first preset difference value and the data volume ratio between the first backlog data volume and the second backlog data volume is larger than a first preset ratio, acquiring the current thread activity of the current thread pool; and when the current thread activity is greater than a first preset activity, determining that the thread pool adjustment mode is a capacity expansion mode.
  10. 10. A data processing apparatus, characterized in that the data processing apparatus comprises: The system comprises an acquisition module, a storage module and a storage module, wherein the acquisition module is used for acquiring a first backlog data amount of a message queue at a first time and acquiring a second backlog data amount of the message queue in a first time period, and the first time period is a time period before the first time; a determining module configured to determine, based on the first backlog data amount and the second backlog data amount, whether the message queue satisfies a thread pool adjustment condition; the adjusting module is used for adjusting the number of threads in the current thread pool of the message queue to obtain a first thread pool of the message queue under the condition that the message queue meets the thread pool adjusting condition; And the processing module is used for processing the data in the message queue based on the first thread pool.
  11. 11. An electronic device comprising a memory storing a computer program and a processor for running the computer program in the memory to perform the steps of the data processing method of any of claims 1 to 9.
  12. 12. A computer readable storage medium, characterized in that it stores a plurality of instructions adapted to be loaded by a processor for performing the steps in the data processing method according to any of claims 1 to 9.
  13. 13. A computer program product comprising a computer program or instructions which, when executed by a processor, carries out the steps of the data processing method of any one of claims 1 to 9.

Description

Data processing method and related device Technical Field The present application relates to the field of data processing technologies, and in particular, to a data processing method and a related device. Background Asynchronous parallel processing is typically performed in data processing using a combination of message middleware (e.g., kafka) plus thread pool techniques. After the initialization of the thread pool is completed when the system is started after the system configuration file and other modes are defined, a certain amount of data is pulled from the data by consuming the theme (Topic) of Kafka during service processing, the threads are taken out from the corresponding thread pool to process the data, and the threads are returned to the thread pool after the processing is completed. However, when some services temporarily increase or decrease, resources in a thread pool of a message queue need to be dynamically adjusted in time to avoid the occurrence of an abnormality of the message queue, but the prior art cannot accurately determine adjustment time, so that the thread pool of the message queue cannot be accurately and dynamically adjusted, and the message queue is abnormal. Disclosure of Invention The embodiment of the application provides a data processing method and a related device, which can accurately and dynamically adjust a thread pool of a message queue and avoid the occurrence of abnormality of the message queue. In a first aspect, the present application provides a data processing method, including: acquiring a first backlog data amount of a message queue at a first time, and acquiring a second backlog data amount of the message queue in a first time period, wherein the first time period is a time period before the first time; determining whether the message queue meets a thread pool adjustment condition based on the first backlog data amount and the second backlog data amount; under the condition that the message queue meets the thread pool adjustment condition, adjusting the thread number of the current thread pool of the message queue to obtain a first thread pool of the message queue; Processing data in the message queue based on the first thread pool. In a second aspect, the present application provides a data processing apparatus comprising: The system comprises an acquisition module, a storage module and a storage module, wherein the acquisition module is used for acquiring a first backlog data amount of a message queue at a first time and acquiring a second backlog data amount of the message queue in a first time period, and the first time period is a time period before the first time; a determining module configured to determine, based on the first backlog data amount and the second backlog data amount, whether the message queue satisfies a thread pool adjustment condition; the adjusting module is used for adjusting the number of threads in the current thread pool of the message queue to obtain a first thread pool of the message queue under the condition that the message queue meets the thread pool adjusting condition; And the processing module is used for processing the data in the message queue based on the first thread pool. In a third aspect, the present application provides an electronic device, including a memory and a processor, where the memory stores a computer program, and the processor is configured to execute the computer program in the memory, to implement steps in the data processing method provided by the present application. In a fourth aspect, the present application provides a computer readable storage medium storing a plurality of instructions adapted to be loaded by a processor for implementing the steps in the data processing method provided by the present application. In a fifth aspect, the present application provides a computer program product comprising a computer program or instructions which, when executed by a processor, carries out the steps of the data processing method provided by the present application. Compared with the related art, the method and the device for processing the data in the message queue in the application have the advantages that the first backlog data quantity of the message queue at the first time is obtained, the second backlog data quantity of the message queue at the first time is obtained, the first time is a time period before the first time, whether the message queue meets the thread pool adjusting condition or not is determined based on the first backlog data quantity and the second backlog data quantity, the thread quantity of the current thread pool of the message queue is adjusted under the condition that the message queue meets the thread pool adjusting condition, the first thread pool of the message queue is obtained, and the data in the message queue is processed based on the first thread pool. The application takes the second backlog data quantity of the message queue in the first time period before the first