826220679@qq.com
15 小时以前 ea135161eff1dd7c71c159be948e93b50fd1db81
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package publicsWay;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import databases.DBConnector;
import scheduled_task.TrackTableManager;
 
public class TrackDataBatchInserter {
    private static final int BATCH_SIZE = 1000;
    private static final long BATCH_INTERVAL_MS = 1000;
    private static final int MAX_QUEUE_SIZE = 100000;
    
    private static final ConcurrentLinkedQueue<TrackData> trackDataQueue = new ConcurrentLinkedQueue<>();
    private static final ScheduledExecutorService batchInsertExecutor = Executors.newSingleThreadScheduledExecutor();
    
    static {
        batchInsertExecutor.scheduleAtFixedRate(() -> {
            try {
                batchInsertTrackData();
            } catch (Exception e) {
                System.err.println("ÅúÁ¿²åÈëÊý¾ÝÒì³£: " + e.getMessage());
            }
        }, BATCH_INTERVAL_MS, BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);
    }
    
 
    // Ìí¼ÓÊý¾Ýµ½¶ÓÁÐ
    public static void addTrackData(TrackData data) {
        if (trackDataQueue.size() < MAX_QUEUE_SIZE) {
            trackDataQueue.offer(data);
        }
    }
    
    private static void batchInsertTrackData() {
        if (trackDataQueue.isEmpty()) {
            return;
        }
        
        String tableName = "tb_track_" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
        
        List<TrackData> batchList = new ArrayList<>(BATCH_SIZE);
        int count = 0;
        
        while (count < BATCH_SIZE && !trackDataQueue.isEmpty()) {
            TrackData data = trackDataQueue.poll();
            if (data != null) {
                batchList.add(data);
                count++;
            }
        }
        
        if (batchList.isEmpty()) {
            return;
        }
        
        try {
            // Ê¹ÓÃDBConnector¼ì²é±íÊÇ·ñ´æÔÚ£¬²»´æÔÚÔò´´½¨
            if (!DBConnector.tableExists(tableName)) {
                DBConnector.executeDDL(TrackTableManager.getcreateSQL(tableName));
            }
            
            // ×¼±¸ÅúÁ¿²åÈëÊý¾Ý
            List<Object[]> paramsList = new ArrayList<>();
            for (TrackData data : batchList) {
                Object[] params = new Object[]{
                    data.getDeviceNumber(), 
                    data.getDeviceName(), 
                    data.getxCoordinate(), 
                    data.getxCoordinate(),
                    data.getxCoordinate(), 
                    data.getLayer(), 
                    data.getBattery(), 
                    data.getUtcTimes(), 
                    data.getLatitude(),
                    data.getLongitude(), 
                    data.getPositioningQuality(), 
                    data.getSatelliteCount(), 
                    data.getHdop(),
                    data.getAltitude(), 
                    data.getGeoidHeight(), 
                    data.getDifferentialTime(), 
                    data.getSource(),
                    data.getCompany(), 
                    data.getSaveTime()
                };
                paramsList.add(params);
            }
            
            // Ê¹ÓÃDBConnector½øÐÐÅúÁ¿²åÈë
            String sql = "INSERT INTO " + tableName + " (device_number, device_name, x_coordinate, " +
                       "y_coordinate, z_coordinate, layer, battery, utc_times, latitude, longitude, " +
                       "positioning_quality, satellite_count, hdop, altitude, geoid_height, " +
                       "differential_time, source, company, save_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
            
            DBConnector.executeBatchUpdate(sql, paramsList);
            
        } catch (Exception e) {
            System.err.println("ÅúÁ¿²åÈëÊý¾Ý±íÒì³£: " + e.getMessage());
            // ½«´íÎóµÄÊý¾ÝÖØÐ·ŻضÓÁÐ
            trackDataQueue.addAll(batchList);
        }
    }
    
    // ¹Ø±ÕÅúÁ¿²åÈë·þÎñ
    public static void shutdown() {
        batchInsertExecutor.shutdown();
        try {
            if (!batchInsertExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                batchInsertExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            batchInsertExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}