Create OCI Bastion Sessions with Python SDK

Bastion Service

Oracle Cloud Bastion service is a fully managed service that provides secure access to the private OCI resources. You can avoid putting your OCI internal resources on the internet [Public IPs] with the bastion service. Resources with public IPs get a lot of attention on the internet, which can be avoided using this service without introducing complex security architectures. To learn more about OCI Bastion Service, check the official guide

Use Case

Since I work in Oracle Cloud VMware Solution[OCVS], I have a requirement to connect to my lab OCVS instances. I use the below architecture to connect to my lab instances without using public IPs.

  • I connect to the OCI Bastion Service using Public / Private Key Combination.
  • I use SSH Port Forwarding to RDP to my scripting server running in OCI using the key authenticated session.
  • Bastion Provides me with a time-bound session with my scripting host.
  • From this host, I can access my OCVS environment. Please note that it’s not a mandatory requirement to use a jump box for this access, however, my use case is for development purposes.
  • The session disconnects after a specified time.
Port Forwarding Using Bastion Service

Challenge

  • The process of connecting to a session is not complex BUT over a period of time, it becomes repetitive and boring to navigate the UI.
  • The sessions are time-bound. The maximum you can keep a bastion session is 3 hours. You need to repeat the UI connection process every 3 hours.

Solution

I thought there has to be a better way to do this. That’s where I decided to explore the OCI Python SDK to do this operation. I started writing some code and ended up with the below workflow.

With the below automation:

User can define how many sessions should the script create in a run, after a previous session has expired.

  • Create a session and get the shell command to do the port forwarding.
  • Run the shell command and wait for it to expire.
  • If session expires and we have not reached the maximum sessions limit a new session is created automatically!

High Script Flow

CODE

User Inputs

File Name: userInputsBastionSession.JSON

{
    "bastionID": "ocid1.bastion.oc1.xxxxxx",
    "sessionType": "PORT_FORWARDING",
    "target_resource_operating_system_user_name": "opc",
    "target_resource_id": "ocid1.instance.oc1.xxxxxxxxxx",
    "target_resource_port": 3389,
    "target_resource_private_ip_address": "x.y.x.n",
    "public_key_content": "ssh-rsa xxxxxxxxxxxx",
    "display_name": "FluffyCloudsSession",
    "key_type": "PUB",
    "session_ttl_in_seconds": 10800,
    "privateKey": "<Private Key Location>",
    "localPort": "3389",
    "maxSessionCount":5
}

create_bastion_session.py

from os import system
import oci
import time
import logging 
import asyncio
import sys
import json
#Logging basic config
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
#Open user inputs file
f = open('userInputsBastionSession.JSON')
# parse the JSON file
try:
    data = json.load(f)
except json.decoder.JSONDecodeError:
    raise("Invalid user inputs JSON")
config = oci.config.from_file("~/.oci/configp", "DEFAULT")
# Initialize service client with default config file
bastion_client = oci.bastion.BastionClient(config)
#Gloabl control variables
WaitRefresh = 15
sessionCount = 1
maxSessionCount = data["maxSessionCount"]
#Ceate a bastion session and wait for it to be active. 
def create_bastion_session(userInputs):
    try:
        create_session_response = bastion_client.create_session(create_session_details=oci.bastion.models.CreateSessionDetails(
                bastion_id=userInputs["bastionID"],
                target_resource_details=oci.bastion.models.CreateManagedSshSessionTargetResourceDetails(
                    session_type=userInputs["sessionType"],
                    target_resource_operating_system_user_name=userInputs["target_resource_operating_system_user_name"],
                    target_resource_id=userInputs["target_resource_id"],
                    target_resource_port=userInputs["target_resource_port"],
                    target_resource_private_ip_address= userInputs["target_resource_private_ip_address"]
                    ),
                key_details=oci.bastion.models.PublicKeyDetails(
                    public_key_content=userInputs["public_key_content"]
                    ),
                display_name=userInputs["display_name"],
                key_type=userInputs["key_type"],
                session_ttl_in_seconds=userInputs["session_ttl_in_seconds"])
                )
        
        get_session_response = bastion_client.get_session(session_id= create_session_response.data.id)
    except:
        raise ("Oops! Error creating the session with the supplied parameters")
    activeSession = False
    count = 0
    maxCount = 15
    try:
        while activeSession == False and count < maxCount:
            get_session_response = bastion_client.get_session(session_id= create_session_response.data.id)
            if (get_session_response.data.lifecycle_state == "ACTIVE" ):
                logging.info("Session has been created and is ACTIVE")
                activeSession == True
                break
            else:
                logging.info("Waiting for session state to be active. Current State .."+ str(get_session_response.data.lifecycle_state))
                time.sleep(WaitRefresh)
                count = count + 1
    except:
        logging.exception ("Oops! Error getting the session with the session ID")
    return get_session_response.data
#Get the ssh command to run on the shell. 
def getCommand(session,userInputs):
    sessionCommand = session.ssh_metadata["command"]
    cmd = sessionCommand.replace("<privateKey>",userInputs["privateKey"])
    cmd = cmd.replace("<localPort>",userInputs["localPort"])
    return cmd
#Run the command on the shell. 
def runBastionCmd (sessionCount,maxSessionCount,userInputs):
    if sessionCount > maxSessionCount:
        sys.exit( "Maximum Sessions Reached!")
    session = create_bastion_session(userInputs)
    cmd = getCommand(session,userInputs)
    ttl = session.session_ttl_in_seconds
    #print("TTL of the session is : "+ str(ttl))
    print("Next session will be created after "+ str(ttl)+ " seconds. Taking a nap till then....")
    print("Please connect to OCI Machine. Bastion session is active.")
    asyncio.run(run(cmd))
    wait_for_session_deletion(session.id)
    #print("Session Details: "+ str(session))
    #time.sleep(ttl+60)
    return session
#Run async command and wait for the output. 
async def run(cmd: str):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stderr=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()
    if proc.returncode != 0:
        logging.error(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        logging.info(f'[stdout]\n{stdout.decode()}')
    if stderr:
        logging.error(f'[stderr]\n{stderr.decode()}')
#Check the status of previous session and wait for the session to deleted. 
def wait_for_session_deletion(sessionID):
    session_deletion = False
    tries = 0
    maxTries = 20
    while session_deletion == False and tries < maxTries:
        get_session_response = bastion_client.get_session(session_id= sessionID)
        if (get_session_response.data.lifecycle_state != "DELETED" ):
            print("Previous session still active. Program will take a nap and check back again. Current status is  "+ str(get_session_response.data.lifecycle_state))
            print("Deleting the session..............")
            delete_session_response = bastion_client.delete_session(session_id=sessionID)
            print(delete_session_response.headers)
            time.sleep(WaitRefresh)
            tries = tries + 1
        else: 
            print("The previous session has been deleted")
            session_deletion = True
            break

#Run a while loop to create sessions till max sessions are reached. 
while sessionCount <= maxSessionCount: 
    try:
        print("************************************ Session Number: "+ str(sessionCount)+ " .Maximum Sessions allowed :"+ str(maxSessionCount)+" ************************************\n")
        ses =  runBastionCmd(sessionCount,maxSessionCount,data)
        sessionCount = sessionCount + 1 
    except KeyboardInterrupt:
        logging.error('Keyboard Interrupt user pressed ctrl-c button.')
        sys.exit(1)
    except:
        sessionCount = sessionCount + 1 
        logging.exception ("Session ended or unable to connect")
        if(sessionCount>maxSessionCount):
            print("Maximum session reached")
            sys.exit(1)
    else:
        logging.error('No errors, Session expired or deleted')

Conclusion

With the above code, I kick off the script at the starting of my day and it keeps me securely connected to my OCI Windows machine for the entire day without navigating to the UI. Set and forget!

This Post Has 3 Comments

  1. Satwinder

    Crafty and easy. Gr8 work!!

  2. Amrik Singh

    Brilliant..!!

  3. Vinothkumar

    Awesome…

Leave a Reply