Search
⌃K

Contract deployment with Python

Below is an example of a contract deployment task, followed by an issuance task for the new contract:
from __future__ import print_function
import base64
import time
from datetime import datetime
from uuid import uuid4
from pprint import pprint
# generated via https://github.com/OpenAPITools/openapi-generator from OpenAPI spec
import openapi_client
from openapi_client import (
Configuration, ContractDeployment, Issuance, Role, InvestorInfo,
Burn, ContractSnapshot
)
from openapi_client.rest import ApiException
URL = 'https://sandbox.cashlink-api.de/v1'
CLIENT_ID = '...'
CLIENT_SECRET = '...'
BACKUP_WALLET = '0x11Fd571Ec9A6ac5b3b546F1D4D57330A973317CA'
def authenticate():
''' Create an authenticated client instance. '''
configuration = Configuration()
configuration.host = URL
# Perform OAuth authentication
response, response_code, _ = openapi_client.ApiClient(configuration).call_api(
resource_path='/oauth/token',
method='POST',
body={
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
response_type='dict(str, str)'
)
assert response_code == 200
# Re-create client with access token
client = openapi_client.ApiClient(
configuration,
header_name='Authorization',
header_value=f'Bearer {response["access_token"]}'
)
return client
def main():
''' Deploy a contract and issue securities. '''
client = authenticate()
# Create an instance of the contracts API
api = openapi_client.ContractsApi(client)
# Create a deployment if none exists yet
deployments = api.list_deployments()
if deployments:
deployment = deployments[0]
else:
# Deploy a contract with managed issuer and approver roles
deployment = api.create_deployment(
x_idempotency_key=str(uuid4()),
contract_deployment=ContractDeployment(
asset_name='Awesome Asset',
asset_code='AWE',
initial_supply='1000000',
calculation_factor='100',
issuer=Role(wallet=BACKUP_WALLET, managed=True),
paper_contract_hash=base64.b64encode(b'a' * 32).decode(),
)
)
pprint(deployment)
print('Waiting for contract deployment...')
while True:
deployment = api.get_deployment(deployment.id)
if deployment.status == 'failed':
print('Deployment failed:')
pprint(deployment.error)
return
if deployment.status == 'succeeded':
break
time.sleep(5)
pprint(deployment)
contract = deployment.contract
print('Contract deployed at address ' + contract['address'])
print('Add an investor to default pool')
api_investor = openapi_client.InvestorsApi(client)
investor = api_investor.add_investor_to_pool(
id=contract['default_pool']['id'],
x_idempotency_key='f0bead27-eacf-4fcf-83f1-9347838b7f88',
investor_info=InvestorInfo(wallet='0x52bc44d5378309EE2abF1539BF71dE1b7d7bE3b5'),
)
pprint(investor)
investor_addr = '0x11Fd571Ec9A6ac5b3b546F1D4D57330A973317CA'
# Issue securities
issuance_amount = '1000'
issuance = api.create_issuance(
x_idempotency_key=str(uuid4()),
id=deployment.contract['id'],
issuance=Issuance(
recipient=investor_addr,
amount=issuance_amount,
)
)
print('Waiting for issuance...')
while True:
issuance = api.get_issuance(issuance.id)
if issuance.status in ['failed', 'unknown']:
print('Issuance might have failed:')
pprint(issuance.error)
return
if issuance.status == 'succeeded':
break
time.sleep(5)
pprint(issuance)
# burning
burning = api.create_burn(
id=deployment.contract['id'],
x_idempotency_key=str(uuid4()),
burn=Burn(
holder=investor_addr,
amount='200'
),
)
# save the time before burning:
snapshot_time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
pprint(burning)
print('Burn some securities ...')
while True:
burn_task = api.get_burn(burning.id)
if burn_task.status in ['failed', 'unknown']:
print('Burning might have failed')
pprint(burn_task.error)
return
if burn_task.status == 'succeeded':
break
time.sleep(5)
pprint(burn_task)
# retrieve holders
snapshot = api.create_contract_snapshot(
id=deployment.contract['id'],
x_idempotency_key=str(uuid4()),
contract_snapshot=ContractSnapshot(
time=snapshot_time
)
)
pprint(snapshot)
print('Waiting for holder listing....')
while True:
snapshot_task = api.get_contract_snapshot(snapshot.id)
if snapshot_task.status in ['failed', 'invalid']:
print('Snapshot failed')
pprint(snapshot_task.error)
return
if snapshot_task.status == 'completed':
break
time.sleep(5)
pprint(snapshot_task)
assert snapshot_task.holders[0].amount == issuance_amount
if __name__ == '__main__':
main()