[SPARK] Spark Streaming 在进行 YARN kill 的时候无法对 eventlog 的 inprogress 进行 rename

2017-07-16

Spark Streaming程序在cluster模式下,进行yarn kill操作时,对导致eventlog无法在stop的时候进行rename,导致spark history无法读取对应的日志

经过分析,原因时在进行kill操作时,YARN的cluster模式下,进行TERM和KILL两个信号会有250ms的时间差,意味着TERM(kill -15 pid)进行了250ms会进行KILL(kill -9 pid)的操作,TERM操作会触发Spark StreamingContext的jvm hook,之后会调用sparkContext的hook,但ss的hook会可能会等待上12s的时间

1
2
3
// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
receiverJobExitLatch.await(10, TimeUnit.SECONDS)

还有在JobScheduler上

1
jobExecutor.awaitTermination(2, TimeUnit.SECONDS)

因此,在streaming未完全停止之前,就执行了kill -9的操作,导致程序过早退出,有可能还会导致streaming程序丢数。

因此可以配置yarn.nodemanager.sleep-delay-before-sigkill.ms(默认250ms)增加,至少为12s之上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// kill process
if (processId != null) {
String user = container.getUser();
LOG.debug("Sending signal to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr);

final Signal signal = sleepDelayBeforeSigKill > 0
? Signal.TERM
: Signal.KILL;

boolean result = exec.signalContainer(new ContainerSignalContext.Builder()
.setContainer(container)
.setUser(user)
.setPid(processId)
.setSignal(signal)
.build()

);

LOG.debug("Sent signal " + signal + " to pid " + processId
+ " as user " + user
+ " for container " + containerIdStr
+ ", result=" + (result? "success" : "failed"));

if (sleepDelayBeforeSigKill > 0) {
new DelayedProcessKiller(container, user,
processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
}