Files
dsk-iac/ansible/01_old/roles/test/files/k8s_status
2023-12-19 13:36:16 +09:00

87 lines
2.7 KiB
Python
Executable File

#! /usr/bin/python3
#-*- coding:utf-8 -*-
import os, sys, subprocess, io, time
from kubernetes import client, config
def debug_print(msg):
print(" # ", msg)
def k8s_conn(KUBE_CONFIG_PATH):
config.load_kube_config(
config_file=KUBE_CONFIG_PATH
)
k8s_api = client.CoreV1Api()
return k8s_api
def k8s_get_pod(k8s_api, namespace, target=''):
pretty=False
watch=False
timeout_seconds=30
api_response = k8s_api.list_namespaced_pod(namespace, pretty=pretty, timeout_seconds=timeout_seconds, watch=watch)
pod_list=[]
for pod in api_response.items:
status = pod.status.phase
#container_status = pod.status.container_statuses[0]
#if container_status.started is False or container_status.ready is False:
# waiting_state = container_status.state.waiting
# if waiting_state.message is not None and 'Error' in waiting_state.message:
# status = waiting_state.reason
if target != '':
if target in pod.metadata.name:
return (pod.metadata.name + " " + status)
pod_list.append(pod.metadata.name+" "+status)
return pod_list
def k8s_pod_status_check(k8s_api, waiting_time, namespace,except_pod=False):
num=0
while True:
num+=1
resp=k8s_get_pod(k8s_api, namespace)
all_run_flag=True
if debug_mode:
debug_print('-'*30)
debug_print('pod 상태 체크시도 : {} ({}s)'.format(num, waiting_time))
debug_print('-'*30)
for i in resp:
if except_pod:
if except_pod in i.lower(): continue
if 'pending' in i.lower():
all_run_flag=False
result='{} 결과: {}'.format(i, all_run_flag)
debug_print(result)
if all_run_flag:
if debug_mode:
debug_print('-'*30)
debug_print('[{}] pod All Running'.format(namespace))
debug_print('-'*30)
for i in resp: debug_print(i)
break
else: time.sleep(int(waiting_time))
def main():
namespace = os.sys.argv[1]
try:
Except_k8s_pod = os.sys.argv[2]
except:
Except_k8s_pod = ''
try:
KUBE_CONFIG_PATH = os.sys.argv[3]
os.environ["KUBECONFIG"]=KUBE_CONFIG_PATH
except:
KUBE_CONFIG_PATH = os.environ["KUBECONFIG"]
k8s_api=k8s_conn(KUBE_CONFIG_PATH)
k8s_pod_status_check(k8s_api, 60, namespace, Except_k8s_pod)
if __name__ == "__main__":
try:
debug_mode=False
main()
except Exception as err:
print("[Usage] k8s_status {namespace} {Except_pod=(default=false)} {KUBECONFIG_PATH=(default=current env)}")
print(err)