@Override publicvoidgetProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver<Commands> responseObserver){ // query profile task list by service id final String serviceId = IDManager.ServiceID.buildId(request.getService(), NodeType.Normal); final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, request.getServiceInstance()); // 从缓存中获取该服务的 profile task 列表,该缓存每隔 10s 从存储中更新 profile task final List<ProfileTask> profileTaskList = profileTaskCache.getProfileTaskList(serviceId); if (CollectionUtils.isEmpty(profileTaskList)) { responseObserver.onNext(Commands.newBuilder().build()); responseObserver.onCompleted(); return; }
// build command list final Commands.Builder commandsBuilder = Commands.newBuilder(); finallong lastCommandTime = request.getLastCommandTime();
for (ProfileTask profileTask : profileTaskList) { // if command create time less than last command time, means sniffer already have task if (profileTask.getCreateTime() <= lastCommandTime) { continue; }
// record profile task log recordProfileTaskLog(profileTask, serviceInstanceId, ProfileTaskLogOperationType.NOTIFIED);
/** * add profile task from OAP */ publicvoidaddProfileTask(ProfileTask task){ // update last command create time if (task.getCreateTime() > lastCommandCreateTime) { lastCommandCreateTime = task.getCreateTime(); }
// check profile task limit final CheckResult dataError = checkProfileTaskSuccess(task); if (!dataError.isSuccess()) { LOGGER.warn( "check command error, cannot process this profile task. reason: {}", dataError.getErrorReason()); return; }
// add task to list profileTaskList.add(task);
// schedule to start task,计算出该任务还需要多久才能启动,通过定时任务实现 long timeToProcessMills = task.getStartTime() - System.currentTimeMillis(); PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS); }
/** * active the selected profile task to execution task, and start a removal task for it. */ privatesynchronizedvoidprocessProfileTask(ProfileTask task){ // make sure prev profile task already stopped stopCurrentProfileTask(taskExecutionContext.get());
// make stop task schedule and task context,通过上下文保存该 profile task,该上下文也实际在 trace 记录时塞入堆栈信息 final ProfileTaskExecutionContext currentStartedTaskContext = new ProfileTaskExecutionContext(task); taskExecutionContext.set(currentStartedTaskContext);
int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();
// run loop when current thread still running long currentLoopStartTime = -1; while (!Thread.currentThread().isInterrupted()) { currentLoopStartTime = System.currentTimeMillis();
// each all slot AtomicReferenceArray<ThreadProfiler> profilers = executionContext.threadProfilerSlots(); int profilerCount = profilers.length(); for (int slot = 0; slot < profilerCount; slot++) { ThreadProfiler currentProfiler = profilers.get(slot); if (currentProfiler == null) { continue; }
// sleep to next period // if out of period, sleep one period long needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis(); needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod; Thread.sleep(needToSleep); } }
// build database data final ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord(); record.setTaskId(snapshot.getTaskId()); record.setSegmentId(snapshot.getTraceSegmentId()); record.setDumpTime(snapshot.getTime()); record.setSequence(snapshot.getSequence()); record.setStackBinary(snapshot.getStack().toByteArray()); record.setTimeBucket(TimeBucket.getRecordTimeBucket(snapshot.getTime()));
/** * check have available slot to profile and add it * * @return is add profile success */ public ProfileStatusReference attemptProfiling(TracingContext tracingContext, String traceSegmentId, String firstSpanOPName){ // check has available slot finalint usingSlotCount = currentProfilingCount.get(); if (usingSlotCount >= Config.Profile.MAX_PARALLEL) { return ProfileStatusReference.createWithNone(); }
// check first operation name matches if (!Objects.equals(task.getFirstSpanOPName(), firstSpanOPName)) { return ProfileStatusReference.createWithNone(); }
// if out limit started profiling count then stop add profiling if (totalStartedProfilingCount.get() > task.getMaxSamplingCount()) { return ProfileStatusReference.createWithNone(); }
// try to occupy slot if (!currentProfilingCount.compareAndSet(usingSlotCount, usingSlotCount + 1)) { return ProfileStatusReference.createWithNone(); } // 将当前线程 Thread.currentThread() 赋值给 ThreadProfiler 的 profilingThread 参数 // 同时将 threadProfiler 存入 profilingSegmentSlots 中,profilingSegmentSlots 会被 profileThread 取出 final ThreadProfiler threadProfiler = new ThreadProfiler( tracingContext, traceSegmentId, Thread.currentThread(), this); int slotLength = profilingSegmentSlots.length(); for (int slot = 0; slot < slotLength; slot++) { if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) { return threadProfiler.profilingStatus(); } } return ProfileStatusReference.createWithNone(); }
例如当前只有一个 http 请求:
在 before method 前将该请求堆栈信息初始化并发送给 oap,该信息包含该 traceId 以及 beginTime。