Hi all,
We’re building a spot and derivatives exchange that I outlined here https://www.singlestore.com/blog/reverse-engineer-chicago-mercantile-exchange/
We just squash merged our latest PR to do the shielded operations for our custodians. Snippet of code below
from decimal import Decimal
from bitcoinrpc.authproxy import AuthServiceProxy
def crypto_get_new_address(wallet_rpc_url: str, user_id: str, address_type: str) -> str:
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
# shielded getnewaddress = z_getnewaddress
new_crypto_address = wallet_rpc_proxy.z_getnewaddress(user_id, address_type)
del wallet_rpc_proxy
return new_crypto_address
def crypto_get_received_by_address(wallet_rpc_url: str, crypto_address: str, deposit_min_confirmation: int) -> int:
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
# shielded getreceivedadress == z_listreceivedbyaddress
received_by_address = wallet_rpc_proxy.z_listreceivedbyaddress(crypto_address, deposit_min_confirmation)
del wallet_rpc_proxy
return received_by_address
def crypto_get_balance(wallet_rpc_url: str, min_confirmation: int) -> Decimal:
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
# shielded getbalance == z_getbalance
balance = wallet_rpc_proxy.z_getbalance("*", min_confirmation)
del wallet_rpc_proxy
return balance
def crypto_get_confirmed_and_unconfirmed_balance(wallet_rpc_url: str, min_confirmation: int) -> (Decimal, Decimal):
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
# shielded getbalance == z_getbalance
confirmed_balance = wallet_rpc_proxy.z_getbalance("*", min_confirmation)
unconfirmed_balance = wallet_rpc_proxy.z_getbalance("*", 0)
del wallet_rpc_proxy
return (confirmed_balance, unconfirmed_balance)
def crypto_get_received_by_label(wallet_rpc_url: str, user_id: str, deposit_min_confirmation: int) -> Decimal:
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
received_by_label = wallet_rpc_proxy.getreceivedbylabel(user_id, deposit_min_confirmation)
del wallet_rpc_proxy
return received_by_label
# def crypto_get_list_transactions(wallet_rpc_url: str, user_id: str, user_tx_limit: int) -> list:
# wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
# list_transactions = wallet_rpc_proxy.listtransactions(user_id, user_tx_limit)
# del wallet_rpc_proxy
# return list_transactions
def crypto_get_transaction_info(wallet_rpc_url: str, tx_id: str) -> dict:
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
# shielded gettransaction == z_viewtransaction
tx_info = wallet_rpc_proxy.z_viewtransaction(tx_id)
del wallet_rpc_proxy
return tx_info
def crypto_send_to_address(
wallet_rpc_url: str, destination_address: str, amount: Decimal, transfer_min_confirmation: int
) -> str:
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
# sendtoaddress does not exists for shielded transactions zcash
# shielded sendmany == z_sendmany
tx_id = wallet_rpc_proxy.z_sendmany(
{
destination_address,
amount,
"", # wallet comment. not needed
"", # wallet comment-to, not needed
True, # substract fee from amount to send
False, # Replaceable via BIP125
transfer_min_confirmation, # Minimum confirmation for transfer
}
)
del wallet_rpc_proxy
return tx_id
def crypto_set_fee(wallet_rpc_url: str, fee: int):
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
wallet_rpc_proxy.settxfee(fee)
del wallet_rpc_proxy
def crypto_send_to_many(wallet_rpc_url: str, batch_approved_transaction: dict) -> dict:
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
tx_id = wallet_rpc_proxy.z_sendmany("", batch_approved_transaction)
return tx_id
def crypto_estimate_smart_fee(wallet_rpc_url: str, balance_confirmation: str, estimate_mode: str) -> dict:
wallet_rpc_proxy = AuthServiceProxy(wallet_rpc_url)
fees = wallet_rpc_proxy.estimatefee(balance_confirmation, estimate_mode)
return fee