Secure access to Google Cloud Resources

Antoine Castex
5 min readMay 5, 2020

When your organization is growing quickly their use of public cloud, security should not be left as an afterthought.

One of the most important aspect is the ServiceAccount (we call it SA) and the keys that are attached to it.

The SA is not the problem here, as Google recommends replacing default SA’s with specific ones and separate them for each services used in GCP.

The biggest problem is the external KEY, we can call it exfiltration of data, we can have multiple people sharing the same key, let’s face it : it’s out of control !!!

Most of the time people just create the JSON Key because it’s suggested by default, and because in the past it was mandatory on many many API’s.

Now when you have 2 differents services that need to talk to one another on GCP you just have to use the get_application_default() on your code with the GCP service called (Just don’t forget to use a specific service account different for each component, for example for a specific Cloud Function with the least privileges principle)

from oauth2client.client import GoogleCredentials
CREDENTIALS = GoogleCredentials.get_application_default()

On top of that there is a feature called Organizational Policy, where you have many differents possibilities

Today we chose not to deploy this policy :

Disable service account creation

This Boolean constraint disables the creation of service accounts where this constraint is set to ‘True’. By default, service accounts can be created by users based on their Cloud IAM roles and permissions.

BUT to we chose deploy this policy :

Disable service account key creation

This Boolean constraint disables the creation of service account external keys where this constraint is set to ‘True’. By default, service account external keys can be created by users based on their Cloud IAM roles and permissions.

Because for us as i’ve said before, SA is not a problem, but having to many external Keys is a real problem !

So now that it’s enabled when someone tries to generate a KEY they got this message :

Create failed. Error 400: Key creation is not allowed on this service account.

That’s awesome !

But…sometime it’s impossible to not accept a key, because some external tools require that as an authentication method (we don’t mention them but imagine another Cloud Provider trying to communicate with GCP for example)

So we decided to provide a solution for people who really need an external Key.

A form to request a key, and after approvalwe want everything to be done automatically because we don’t really like to spend all day creating external key manually….

Here is what we will do :

Now let me give you the sample code of each step (currently i’m doing every step in just one main function)

I’m mostly using the Discovery API because the libraries are not equivalent, some times the features that we need are not yet available.

Step 1

A message sent to Cloud Pub/Sub triggers a Cloud Function
No code needed here because because it’s automatically done by Google

Step 2

The org policy is temporary disabled on the specified project

from googleapiclient import discoverySERVICE_RESOURCE_MANAGER = discovery.build(‘cloudresourcemanager’, ‘v1’,
credentials=CREDENTIALS)
resource = ‘projects/’+project_id
disable_org_policy_request_body = {
“policy”: {
“constraint”: "constraints/iam.disableServiceAccountKeyCreation",
“booleanPolicy”:
{
“enforced”: False}}}
def disable_policy_operator(resource, body):request = SERVICE_RESOURCE_MANAGER.projects().setOrgPolicy(resource=resource, body=disable_org_policy_request_body)
response = request.execute()
return response

Step 3

The different required API’s are enabled :

  • cloudapis.googleapis.com to intialize default APIs for a new project
  • cloudbilling.googleapis.com for the next step
  • billingbudgets.googleapis.com for the next step
  • serviceusage.googleapis.com for enabling APIs
  • cloudresourcemanager.googleapis.com
  • iam.googleapis.com for the IAM Service Account KEY generation
from googleapiclient import discoverySERVICE_USAGE_MANAGER = discovery.build('serviceusage', 'v1',
credentials=CREDENTIALS)
APIS = [
‘cloudapis.googleapis.com’,
‘cloudbilling.googleapis.com’,
‘serviceusage.googleapis.com’,
‘cloudresourcemanager.googleapis.com’,
‘iam.googleapis.com’,
‘billingbudgets.googleapis.com’]
project_id = “projects/” + project_name
body = {“serviceIds”: batch_apis}
response = enable_batch_apis_operator(project_id, body)def enable_batch_apis_operator(project_id, body):request = SERVICE_USAGE_MANAGER.services().batchEnable(parent=project_id, body=body)
response = request.execute()
return response

Step 4

The billing API is called to know if the project is linked to a billing or not (otherwise we add it and create a small budget alert using an API too)

from googleapiclient import discoverySERVICE_CLOUD_BILLING = discovery.build('cloudbilling', 'v1', credentials=CREDENTIALS)request = SERVICE_CLOUD_BILLING.projects().getBillingInfo(name="projects/"{project_id})
response = request.execute()
return response

Step 5

We enable the SecretManager API (it’s mandatory to have a billing account before enabling it) secretmanager.googleapis.com

from googleapiclient import discoverySERVICE_USAGE_MANAGER = discovery.build('serviceusage', 'v1',credentials=CREDENTIALS)body = “projects/” + project_name + “/services/secretmanager.googleapis.com”request = SERVICE_USAGE_MANAGER.services().enable(name=body)
response = request.execute()

Step 6

We generate the JSON KEY without storing it anywhere

from googleapiclient import discoverySERVICE_ACCOUNT_MANAGER = discovery.build('iam', 'v1', credentials=CREDENTIALS)request = SERVICE_ACCOUNT_MANAGER.projects().serviceAccounts().keys().create(name=’projects/’+project_id+’/serviceAccounts/’ + service_account_email, body={})response = request.execute()
return response[‘privateKeyData’]

Step 7

We send it to SecretManager and :

  • First, initate the connection to the SecretManager
from google.cloud import secretmanager
SECRET_MANAGER = secretmanager.SecretManagerServiceClient()
  • Check if a Secret for this SA already exists
def check_secret_version_operator(secret_id, project):name = SECRET_MANAGER.secret_version_path(project, secret_id, 'latest')
response = SECRET_MANAGER.access_secret_version(name)
return response
  • if it already exists we simply just generate a new version
def create_secret_version_operator(secret_id, key, project):key = base64.b64decode(key)
name = SECRET_MANAGER.secret_version_path(project, secret_id, 'latest')
response = SECRET_MANAGER.access_secret_version(name)
logging.info(response)
try:
name = SECRET_MANAGER.secret_version_path(project, secret_id, 'v1')
response = SECRET_MANAGER.access_secret_version(name)
logging.info(response)
except:
parent = SECRET_MANAGER.secret_path(project, secret_id)
request = SECRET_MANAGER.add_secret_version(parent, {'data': key})
response = SECRET_MANAGER.access_secret_version(request.name)
payload = response.payload.data.decode('UTF-8')
logging.info('Plaintext: {}'.format(payload))
logging.info('Created secret: {}'.format(response.name))
return response
  • If not we create a Secret with the first version :
def create_secret_operator(project_id, secret_id):parent = SECRET_MANAGER.project_path(project_id)
secret = SECRET_MANAGER.create_secret(parent, secret_id, {'replication': {'automatic': {},},})
return secret_id

def create_secret_version_operator(secret_id, key, project):
key = base64.b64decode(key)
name = SECRET_MANAGER.secret_version_path(project, secret_id, 'latest')
response = SECRET_MANAGER.access_secret_version(name)
logging.info(response)
try:
name = SECRET_MANAGER.secret_version_path(project, secret_id, 'v1')
response = SECRET_MANAGER.access_secret_version(name)
logging.info(response)
except:
parent = SECRET_MANAGER.secret_path(project, secret_id)
request = SECRET_MANAGER.add_secret_version(parent, {'data': key})
response = SECRET_MANAGER.access_secret_version(request.name)
payload = response.payload.data.decode('UTF-8')
logging.info('Plaintext: {}'.format(payload))
logging.info('Created secret: {}'.format(response.name))
return response

Step 8

We re-enable the policy on the project

from googleapiclient import discoverySERVICE_RESOURCE_MANAGER = discovery.build('cloudresourcemanager', 'v1',credentials=CREDENTIALS)resource = ‘projects/’+project_name
set_org_policy_request_body = {“constraint”: "constraints/iam.disableServiceAccountKeyCreation"}
request = SERVICE_RESOURCE_MANAGER.projects().clearOrgPolicy(resource=resource, body=set_org_policy_request_body)response = request.execute()

We return the gcloud command to the user to get his key from Secret Manager

Every step is monitored, with Error Reporting, with just this sample of code :

from google.cloud import error_reporting
client = error_reporting.Client()

try:
raise RuntimeError('I failed you')
except RuntimeError:
client.report_exception()

And we send an email (with SendGRID) to the team administrator if a step failed to be instantly informed :

def send_email(project_id_target, owner, status, step):sg = SendGridAPIClient("your_sendgrid_api_key")subject=f”Google Cloud Service Account JSON Key Creation”if status == ‘ERROR’:subject=f”Google Cloud Service Account JSON Key Creation Error, {step}”html_content = f”””<p>Hello, the project named {project_id_target},</p><p>with {owner} as owner</p>“””message = Mail(to_emails= email_var(),from_email=Email(email_var(), “Google Cloud Team”),subject=subject,html_content=html_content)try:response = sg.send(message)return f”email.status_code={response.status_code}”except HTTPError as e:return e.message

The next steps will be to offer a rotation of every key that we give and clean up the old ones… See you on the next episode !

--

--